blob: 42f6dd9326408d78340c4b7da56fde91075e733f [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010017import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
Charles-François Natalif8413b22011-09-21 18:44:49 +020035from multiprocessing import util
36
37try:
38 from multiprocessing import reduction
39 HAS_REDUCTION = True
40except ImportError:
41 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Brian Curtina06e9b82010-10-07 02:27:41 +000043try:
44 from multiprocessing.sharedctypes import Value, copy
45 HAS_SHAREDCTYPES = True
46except ImportError:
47 HAS_SHAREDCTYPES = False
48
Antoine Pitroua1a8da82011-08-23 19:54:20 +020049try:
50 import msvcrt
51except ImportError:
52 msvcrt = None
53
Benjamin Petersondfd79492008-06-13 19:13:39 +000054#
55#
56#
57
Benjamin Petersone79edf52008-07-13 18:34:58 +000058latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000059
Benjamin Petersondfd79492008-06-13 19:13:39 +000060#
61# Constants
62#
63
64LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000065#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000066
67DELTA = 0.1
68CHECK_TIMINGS = False # making true makes tests take a lot longer
69 # and can sometimes cause some non-serious
70 # failures because some calls block a bit
71 # longer than expected
72if CHECK_TIMINGS:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
74else:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
76
77HAVE_GETVALUE = not getattr(_multiprocessing,
78 'HAVE_BROKEN_SEM_GETVALUE', False)
79
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000080WIN32 = (sys.platform == "win32")
81
Antoine Pitroua1a8da82011-08-23 19:54:20 +020082try:
83 MAXFD = os.sysconf("SC_OPEN_MAX")
84except:
85 MAXFD = 256
86
Benjamin Petersondfd79492008-06-13 19:13:39 +000087#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000088# Some tests require ctypes
89#
90
91try:
Nick Coghlan13623662010-04-10 14:24:36 +000092 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000093except ImportError:
94 Structure = object
95 c_int = c_double = None
96
Charles-François Natali6392d7f2011-11-22 18:35:18 +010097
98def check_enough_semaphores():
99 """Check that the system supports enough semaphores to run the test."""
100 # minimum number of semaphores available according to POSIX
101 nsems_min = 256
102 try:
103 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
104 except (AttributeError, ValueError):
105 # sysconf not available or setting not available
106 return
107 if nsems == -1 or nsems >= nsems_min:
108 return
109 raise unittest.SkipTest("The OS doesn't support enough semaphores "
110 "to run the test (required: %d)." % nsems_min)
111
112
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000113#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000114# Creates a wrapper for a function which records the time it takes to finish
115#
116
117class TimingWrapper(object):
118
119 def __init__(self, func):
120 self.func = func
121 self.elapsed = None
122
123 def __call__(self, *args, **kwds):
124 t = time.time()
125 try:
126 return self.func(*args, **kwds)
127 finally:
128 self.elapsed = time.time() - t
129
130#
131# Base class for test cases
132#
133
134class BaseTestCase(object):
135
136 ALLOWED_TYPES = ('processes', 'manager', 'threads')
137
138 def assertTimingAlmostEqual(self, a, b):
139 if CHECK_TIMINGS:
140 self.assertAlmostEqual(a, b, 1)
141
142 def assertReturnsIfImplemented(self, value, func, *args):
143 try:
144 res = func(*args)
145 except NotImplementedError:
146 pass
147 else:
148 return self.assertEqual(value, res)
149
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000150 # For the sanity of Windows users, rather than crashing or freezing in
151 # multiple ways.
152 def __reduce__(self, *args):
153 raise NotImplementedError("shouldn't try to pickle a test case")
154
155 __reduce_ex__ = __reduce__
156
Benjamin Petersondfd79492008-06-13 19:13:39 +0000157#
158# Return the value of a semaphore
159#
160
161def get_value(self):
162 try:
163 return self.get_value()
164 except AttributeError:
165 try:
166 return self._Semaphore__value
167 except AttributeError:
168 try:
169 return self._value
170 except AttributeError:
171 raise NotImplementedError
172
173#
174# Testcases
175#
176
177class _TestProcess(BaseTestCase):
178
179 ALLOWED_TYPES = ('processes', 'threads')
180
181 def test_current(self):
182 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600183 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184
185 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000186 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187
188 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000189 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000190 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000192 self.assertEqual(current.ident, os.getpid())
193 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000195 @classmethod
196 def _test(cls, q, *args, **kwds):
197 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 q.put(args)
199 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000201 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203 q.put(current.pid)
204
205 def test_process(self):
206 q = self.Queue(1)
207 e = self.Event()
208 args = (q, 1, 2)
209 kwargs = {'hello':23, 'bye':2.54}
210 name = 'SomeProcess'
211 p = self.Process(
212 target=self._test, args=args, kwargs=kwargs, name=name
213 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215 current = self.current_process()
216
217 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000218 self.assertEqual(p.authkey, current.authkey)
219 self.assertEqual(p.is_alive(), False)
220 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000223 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224
225 p.start()
226
Ezio Melotti2623a372010-11-21 13:34:58 +0000227 self.assertEqual(p.exitcode, None)
228 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000229 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000230
Ezio Melotti2623a372010-11-21 13:34:58 +0000231 self.assertEqual(q.get(), args[1:])
232 self.assertEqual(q.get(), kwargs)
233 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000234 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000235 self.assertEqual(q.get(), current.authkey)
236 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000237
238 p.join()
239
Ezio Melotti2623a372010-11-21 13:34:58 +0000240 self.assertEqual(p.exitcode, 0)
241 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000242 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000243
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000244 @classmethod
245 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000246 time.sleep(1000)
247
248 def test_terminate(self):
249 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600250 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000251
252 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000253 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000254 p.start()
255
256 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000258 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000259
260 p.terminate()
261
262 join = TimingWrapper(p.join)
263 self.assertEqual(join(), None)
264 self.assertTimingAlmostEqual(join.elapsed, 0.0)
265
266 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000267 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000268
269 p.join()
270
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000271 # XXX sometimes get p.exitcode == 0 on Windows ...
272 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000273
274 def test_cpu_count(self):
275 try:
276 cpus = multiprocessing.cpu_count()
277 except NotImplementedError:
278 cpus = 1
279 self.assertTrue(type(cpus) is int)
280 self.assertTrue(cpus >= 1)
281
282 def test_active_children(self):
283 self.assertEqual(type(self.active_children()), list)
284
285 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000286 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000287
Jesus Cea6f6016b2011-09-09 20:26:57 +0200288 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000290 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291
292 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000294
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000295 @classmethod
296 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000297 from multiprocessing import forking
298 wconn.send(id)
299 if len(id) < 2:
300 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000301 p = cls.Process(
302 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000303 )
304 p.start()
305 p.join()
306
307 def test_recursion(self):
308 rconn, wconn = self.Pipe(duplex=False)
309 self._test_recursion(wconn, [])
310
311 time.sleep(DELTA)
312 result = []
313 while rconn.poll():
314 result.append(rconn.recv())
315
316 expected = [
317 [],
318 [0],
319 [0, 0],
320 [0, 1],
321 [1],
322 [1, 0],
323 [1, 1]
324 ]
325 self.assertEqual(result, expected)
326
Richard Oudkerk2182e052012-06-06 19:01:14 +0100327 @classmethod
328 def _test_sys_exit(cls, reason, testfn):
329 sys.stderr = open(testfn, 'w')
330 sys.exit(reason)
331
332 def test_sys_exit(self):
333 # See Issue 13854
334 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600335 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100336
337 testfn = test_support.TESTFN
338 self.addCleanup(test_support.unlink, testfn)
339
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000340 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100341 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
342 p.daemon = True
343 p.start()
344 p.join(5)
345 self.assertEqual(p.exitcode, code)
346
347 with open(testfn, 'r') as f:
348 self.assertEqual(f.read().rstrip(), str(reason))
349
350 for reason in (True, False, 8):
351 p = self.Process(target=sys.exit, args=(reason,))
352 p.daemon = True
353 p.start()
354 p.join(5)
355 self.assertEqual(p.exitcode, reason)
356
Benjamin Petersondfd79492008-06-13 19:13:39 +0000357#
358#
359#
360
361class _UpperCaser(multiprocessing.Process):
362
363 def __init__(self):
364 multiprocessing.Process.__init__(self)
365 self.child_conn, self.parent_conn = multiprocessing.Pipe()
366
367 def run(self):
368 self.parent_conn.close()
369 for s in iter(self.child_conn.recv, None):
370 self.child_conn.send(s.upper())
371 self.child_conn.close()
372
373 def submit(self, s):
374 assert type(s) is str
375 self.parent_conn.send(s)
376 return self.parent_conn.recv()
377
378 def stop(self):
379 self.parent_conn.send(None)
380 self.parent_conn.close()
381 self.child_conn.close()
382
383class _TestSubclassingProcess(BaseTestCase):
384
385 ALLOWED_TYPES = ('processes',)
386
387 def test_subclassing(self):
388 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200389 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000390 uppercaser.start()
391 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
392 self.assertEqual(uppercaser.submit('world'), 'WORLD')
393 uppercaser.stop()
394 uppercaser.join()
395
396#
397#
398#
399
400def queue_empty(q):
401 if hasattr(q, 'empty'):
402 return q.empty()
403 else:
404 return q.qsize() == 0
405
406def queue_full(q, maxsize):
407 if hasattr(q, 'full'):
408 return q.full()
409 else:
410 return q.qsize() == maxsize
411
412
413class _TestQueue(BaseTestCase):
414
415
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000416 @classmethod
417 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000418 child_can_start.wait()
419 for i in range(6):
420 queue.get()
421 parent_can_continue.set()
422
423 def test_put(self):
424 MAXSIZE = 6
425 queue = self.Queue(maxsize=MAXSIZE)
426 child_can_start = self.Event()
427 parent_can_continue = self.Event()
428
429 proc = self.Process(
430 target=self._test_put,
431 args=(queue, child_can_start, parent_can_continue)
432 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000433 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000434 proc.start()
435
436 self.assertEqual(queue_empty(queue), True)
437 self.assertEqual(queue_full(queue, MAXSIZE), False)
438
439 queue.put(1)
440 queue.put(2, True)
441 queue.put(3, True, None)
442 queue.put(4, False)
443 queue.put(5, False, None)
444 queue.put_nowait(6)
445
446 # the values may be in buffer but not yet in pipe so sleep a bit
447 time.sleep(DELTA)
448
449 self.assertEqual(queue_empty(queue), False)
450 self.assertEqual(queue_full(queue, MAXSIZE), True)
451
452 put = TimingWrapper(queue.put)
453 put_nowait = TimingWrapper(queue.put_nowait)
454
455 self.assertRaises(Queue.Full, put, 7, False)
456 self.assertTimingAlmostEqual(put.elapsed, 0)
457
458 self.assertRaises(Queue.Full, put, 7, False, None)
459 self.assertTimingAlmostEqual(put.elapsed, 0)
460
461 self.assertRaises(Queue.Full, put_nowait, 7)
462 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
463
464 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
465 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
466
467 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
468 self.assertTimingAlmostEqual(put.elapsed, 0)
469
470 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
471 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
472
473 child_can_start.set()
474 parent_can_continue.wait()
475
476 self.assertEqual(queue_empty(queue), True)
477 self.assertEqual(queue_full(queue, MAXSIZE), False)
478
479 proc.join()
480
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000481 @classmethod
482 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000483 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000484 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 queue.put(2)
486 queue.put(3)
487 queue.put(4)
488 queue.put(5)
489 parent_can_continue.set()
490
491 def test_get(self):
492 queue = self.Queue()
493 child_can_start = self.Event()
494 parent_can_continue = self.Event()
495
496 proc = self.Process(
497 target=self._test_get,
498 args=(queue, child_can_start, parent_can_continue)
499 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000500 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000501 proc.start()
502
503 self.assertEqual(queue_empty(queue), True)
504
505 child_can_start.set()
506 parent_can_continue.wait()
507
508 time.sleep(DELTA)
509 self.assertEqual(queue_empty(queue), False)
510
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000511 # Hangs unexpectedly, remove for now
512 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000513 self.assertEqual(queue.get(True, None), 2)
514 self.assertEqual(queue.get(True), 3)
515 self.assertEqual(queue.get(timeout=1), 4)
516 self.assertEqual(queue.get_nowait(), 5)
517
518 self.assertEqual(queue_empty(queue), True)
519
520 get = TimingWrapper(queue.get)
521 get_nowait = TimingWrapper(queue.get_nowait)
522
523 self.assertRaises(Queue.Empty, get, False)
524 self.assertTimingAlmostEqual(get.elapsed, 0)
525
526 self.assertRaises(Queue.Empty, get, False, None)
527 self.assertTimingAlmostEqual(get.elapsed, 0)
528
529 self.assertRaises(Queue.Empty, get_nowait)
530 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
531
532 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
533 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
534
535 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
536 self.assertTimingAlmostEqual(get.elapsed, 0)
537
538 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
539 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
540
541 proc.join()
542
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000543 @classmethod
544 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000545 for i in range(10, 20):
546 queue.put(i)
547 # note that at this point the items may only be buffered, so the
548 # process cannot shutdown until the feeder thread has finished
549 # pushing items onto the pipe.
550
551 def test_fork(self):
552 # Old versions of Queue would fail to create a new feeder
553 # thread for a forked process if the original process had its
554 # own feeder thread. This test checks that this no longer
555 # happens.
556
557 queue = self.Queue()
558
559 # put items on queue so that main process starts a feeder thread
560 for i in range(10):
561 queue.put(i)
562
563 # wait to make sure thread starts before we fork a new process
564 time.sleep(DELTA)
565
566 # fork process
567 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200568 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000569 p.start()
570
571 # check that all expected items are in the queue
572 for i in range(20):
573 self.assertEqual(queue.get(), i)
574 self.assertRaises(Queue.Empty, queue.get, False)
575
576 p.join()
577
578 def test_qsize(self):
579 q = self.Queue()
580 try:
581 self.assertEqual(q.qsize(), 0)
582 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600583 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584 q.put(1)
585 self.assertEqual(q.qsize(), 1)
586 q.put(5)
587 self.assertEqual(q.qsize(), 2)
588 q.get()
589 self.assertEqual(q.qsize(), 1)
590 q.get()
591 self.assertEqual(q.qsize(), 0)
592
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000593 @classmethod
594 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000595 for obj in iter(q.get, None):
596 time.sleep(DELTA)
597 q.task_done()
598
599 def test_task_done(self):
600 queue = self.JoinableQueue()
601
602 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000603 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000604
605 workers = [self.Process(target=self._test_task_done, args=(queue,))
606 for i in xrange(4)]
607
608 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200609 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000610 p.start()
611
612 for i in xrange(10):
613 queue.put(i)
614
615 queue.join()
616
617 for p in workers:
618 queue.put(None)
619
620 for p in workers:
621 p.join()
622
623#
624#
625#
626
627class _TestLock(BaseTestCase):
628
629 def test_lock(self):
630 lock = self.Lock()
631 self.assertEqual(lock.acquire(), True)
632 self.assertEqual(lock.acquire(False), False)
633 self.assertEqual(lock.release(), None)
634 self.assertRaises((ValueError, threading.ThreadError), lock.release)
635
636 def test_rlock(self):
637 lock = self.RLock()
638 self.assertEqual(lock.acquire(), True)
639 self.assertEqual(lock.acquire(), True)
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.release(), None)
642 self.assertEqual(lock.release(), None)
643 self.assertEqual(lock.release(), None)
644 self.assertRaises((AssertionError, RuntimeError), lock.release)
645
Jesse Noller82eb5902009-03-30 23:29:31 +0000646 def test_lock_context(self):
647 with self.Lock():
648 pass
649
Benjamin Petersondfd79492008-06-13 19:13:39 +0000650
651class _TestSemaphore(BaseTestCase):
652
653 def _test_semaphore(self, sem):
654 self.assertReturnsIfImplemented(2, get_value, sem)
655 self.assertEqual(sem.acquire(), True)
656 self.assertReturnsIfImplemented(1, get_value, sem)
657 self.assertEqual(sem.acquire(), True)
658 self.assertReturnsIfImplemented(0, get_value, sem)
659 self.assertEqual(sem.acquire(False), False)
660 self.assertReturnsIfImplemented(0, get_value, sem)
661 self.assertEqual(sem.release(), None)
662 self.assertReturnsIfImplemented(1, get_value, sem)
663 self.assertEqual(sem.release(), None)
664 self.assertReturnsIfImplemented(2, get_value, sem)
665
666 def test_semaphore(self):
667 sem = self.Semaphore(2)
668 self._test_semaphore(sem)
669 self.assertEqual(sem.release(), None)
670 self.assertReturnsIfImplemented(3, get_value, sem)
671 self.assertEqual(sem.release(), None)
672 self.assertReturnsIfImplemented(4, get_value, sem)
673
674 def test_bounded_semaphore(self):
675 sem = self.BoundedSemaphore(2)
676 self._test_semaphore(sem)
677 # Currently fails on OS/X
678 #if HAVE_GETVALUE:
679 # self.assertRaises(ValueError, sem.release)
680 # self.assertReturnsIfImplemented(2, get_value, sem)
681
682 def test_timeout(self):
683 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600684 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000685
686 sem = self.Semaphore(0)
687 acquire = TimingWrapper(sem.acquire)
688
689 self.assertEqual(acquire(False), False)
690 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
691
692 self.assertEqual(acquire(False, None), False)
693 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
694
695 self.assertEqual(acquire(False, TIMEOUT1), False)
696 self.assertTimingAlmostEqual(acquire.elapsed, 0)
697
698 self.assertEqual(acquire(True, TIMEOUT2), False)
699 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
700
701 self.assertEqual(acquire(timeout=TIMEOUT3), False)
702 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
703
704
705class _TestCondition(BaseTestCase):
706
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000707 @classmethod
708 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000709 cond.acquire()
710 sleeping.release()
711 cond.wait(timeout)
712 woken.release()
713 cond.release()
714
715 def check_invariant(self, cond):
716 # this is only supposed to succeed when there are no sleepers
717 if self.TYPE == 'processes':
718 try:
719 sleepers = (cond._sleeping_count.get_value() -
720 cond._woken_count.get_value())
721 self.assertEqual(sleepers, 0)
722 self.assertEqual(cond._wait_semaphore.get_value(), 0)
723 except NotImplementedError:
724 pass
725
726 def test_notify(self):
727 cond = self.Condition()
728 sleeping = self.Semaphore(0)
729 woken = self.Semaphore(0)
730
731 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000732 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000733 p.start()
734
735 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000736 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000737 p.start()
738
739 # wait for both children to start sleeping
740 sleeping.acquire()
741 sleeping.acquire()
742
743 # check no process/thread has woken up
744 time.sleep(DELTA)
745 self.assertReturnsIfImplemented(0, get_value, woken)
746
747 # wake up one process/thread
748 cond.acquire()
749 cond.notify()
750 cond.release()
751
752 # check one process/thread has woken up
753 time.sleep(DELTA)
754 self.assertReturnsIfImplemented(1, get_value, woken)
755
756 # wake up another
757 cond.acquire()
758 cond.notify()
759 cond.release()
760
761 # check other has woken up
762 time.sleep(DELTA)
763 self.assertReturnsIfImplemented(2, get_value, woken)
764
765 # check state is not mucked up
766 self.check_invariant(cond)
767 p.join()
768
769 def test_notify_all(self):
770 cond = self.Condition()
771 sleeping = self.Semaphore(0)
772 woken = self.Semaphore(0)
773
774 # start some threads/processes which will timeout
775 for i in range(3):
776 p = self.Process(target=self.f,
777 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000778 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000779 p.start()
780
781 t = threading.Thread(target=self.f,
782 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000783 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000784 t.start()
785
786 # wait for them all to sleep
787 for i in xrange(6):
788 sleeping.acquire()
789
790 # check they have all timed out
791 for i in xrange(6):
792 woken.acquire()
793 self.assertReturnsIfImplemented(0, get_value, woken)
794
795 # check state is not mucked up
796 self.check_invariant(cond)
797
798 # start some more threads/processes
799 for i in range(3):
800 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000801 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000802 p.start()
803
804 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000805 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000806 t.start()
807
808 # wait for them to all sleep
809 for i in xrange(6):
810 sleeping.acquire()
811
812 # check no process/thread has woken up
813 time.sleep(DELTA)
814 self.assertReturnsIfImplemented(0, get_value, woken)
815
816 # wake them all up
817 cond.acquire()
818 cond.notify_all()
819 cond.release()
820
821 # check they have all woken
822 time.sleep(DELTA)
823 self.assertReturnsIfImplemented(6, get_value, woken)
824
825 # check state is not mucked up
826 self.check_invariant(cond)
827
828 def test_timeout(self):
829 cond = self.Condition()
830 wait = TimingWrapper(cond.wait)
831 cond.acquire()
832 res = wait(TIMEOUT1)
833 cond.release()
834 self.assertEqual(res, None)
835 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
836
837
838class _TestEvent(BaseTestCase):
839
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000840 @classmethod
841 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842 time.sleep(TIMEOUT2)
843 event.set()
844
845 def test_event(self):
846 event = self.Event()
847 wait = TimingWrapper(event.wait)
848
Ezio Melottic2077b02011-03-16 12:34:31 +0200849 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000850 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000851 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000852
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000853 # Removed, threading.Event.wait() will return the value of the __flag
854 # instead of None. API Shear with the semaphore backed mp.Event
855 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000856 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000857 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
859
860 event.set()
861
862 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000863 self.assertEqual(event.is_set(), True)
864 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000865 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000866 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000867 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
868 # self.assertEqual(event.is_set(), True)
869
870 event.clear()
871
872 #self.assertEqual(event.is_set(), False)
873
Jesus Cea6f6016b2011-09-09 20:26:57 +0200874 p = self.Process(target=self._test_event, args=(event,))
875 p.daemon = True
876 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000877 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000878
879#
880#
881#
882
883class _TestValue(BaseTestCase):
884
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000885 ALLOWED_TYPES = ('processes',)
886
Benjamin Petersondfd79492008-06-13 19:13:39 +0000887 codes_values = [
888 ('i', 4343, 24234),
889 ('d', 3.625, -4.25),
890 ('h', -232, 234),
891 ('c', latin('x'), latin('y'))
892 ]
893
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000894 def setUp(self):
895 if not HAS_SHAREDCTYPES:
896 self.skipTest("requires multiprocessing.sharedctypes")
897
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000898 @classmethod
899 def _test(cls, values):
900 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000901 sv.value = cv[2]
902
903
904 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000905 if raw:
906 values = [self.RawValue(code, value)
907 for code, value, _ in self.codes_values]
908 else:
909 values = [self.Value(code, value)
910 for code, value, _ in self.codes_values]
911
912 for sv, cv in zip(values, self.codes_values):
913 self.assertEqual(sv.value, cv[1])
914
915 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200916 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 proc.start()
918 proc.join()
919
920 for sv, cv in zip(values, self.codes_values):
921 self.assertEqual(sv.value, cv[2])
922
923 def test_rawvalue(self):
924 self.test_value(raw=True)
925
926 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000927 val1 = self.Value('i', 5)
928 lock1 = val1.get_lock()
929 obj1 = val1.get_obj()
930
931 val2 = self.Value('i', 5, lock=None)
932 lock2 = val2.get_lock()
933 obj2 = val2.get_obj()
934
935 lock = self.Lock()
936 val3 = self.Value('i', 5, lock=lock)
937 lock3 = val3.get_lock()
938 obj3 = val3.get_obj()
939 self.assertEqual(lock, lock3)
940
Jesse Noller6ab22152009-01-18 02:45:38 +0000941 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000942 self.assertFalse(hasattr(arr4, 'get_lock'))
943 self.assertFalse(hasattr(arr4, 'get_obj'))
944
Jesse Noller6ab22152009-01-18 02:45:38 +0000945 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
946
947 arr5 = self.RawValue('i', 5)
948 self.assertFalse(hasattr(arr5, 'get_lock'))
949 self.assertFalse(hasattr(arr5, 'get_obj'))
950
Benjamin Petersondfd79492008-06-13 19:13:39 +0000951
952class _TestArray(BaseTestCase):
953
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000954 ALLOWED_TYPES = ('processes',)
955
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000956 @classmethod
957 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000958 for i in range(1, len(seq)):
959 seq[i] += seq[i-1]
960
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000961 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000962 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000963 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
964 if raw:
965 arr = self.RawArray('i', seq)
966 else:
967 arr = self.Array('i', seq)
968
969 self.assertEqual(len(arr), len(seq))
970 self.assertEqual(arr[3], seq[3])
971 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
972
973 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
974
975 self.assertEqual(list(arr[:]), seq)
976
977 self.f(seq)
978
979 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200980 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000981 p.start()
982 p.join()
983
984 self.assertEqual(list(arr[:]), seq)
985
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000986 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000987 def test_array_from_size(self):
988 size = 10
989 # Test for zeroing (see issue #11675).
990 # The repetition below strengthens the test by increasing the chances
991 # of previously allocated non-zero memory being used for the new array
992 # on the 2nd and 3rd loops.
993 for _ in range(3):
994 arr = self.Array('i', size)
995 self.assertEqual(len(arr), size)
996 self.assertEqual(list(arr), [0] * size)
997 arr[:] = range(10)
998 self.assertEqual(list(arr), range(10))
999 del arr
1000
1001 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001002 def test_rawarray(self):
1003 self.test_array(raw=True)
1004
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001005 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001006 def test_array_accepts_long(self):
1007 arr = self.Array('i', 10L)
1008 self.assertEqual(len(arr), 10)
1009 raw_arr = self.RawArray('i', 10L)
1010 self.assertEqual(len(raw_arr), 10)
1011
1012 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001013 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001014 arr1 = self.Array('i', range(10))
1015 lock1 = arr1.get_lock()
1016 obj1 = arr1.get_obj()
1017
1018 arr2 = self.Array('i', range(10), lock=None)
1019 lock2 = arr2.get_lock()
1020 obj2 = arr2.get_obj()
1021
1022 lock = self.Lock()
1023 arr3 = self.Array('i', range(10), lock=lock)
1024 lock3 = arr3.get_lock()
1025 obj3 = arr3.get_obj()
1026 self.assertEqual(lock, lock3)
1027
Jesse Noller6ab22152009-01-18 02:45:38 +00001028 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001029 self.assertFalse(hasattr(arr4, 'get_lock'))
1030 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001031 self.assertRaises(AttributeError,
1032 self.Array, 'i', range(10), lock='notalock')
1033
1034 arr5 = self.RawArray('i', range(10))
1035 self.assertFalse(hasattr(arr5, 'get_lock'))
1036 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001037
1038#
1039#
1040#
1041
1042class _TestContainers(BaseTestCase):
1043
1044 ALLOWED_TYPES = ('manager',)
1045
1046 def test_list(self):
1047 a = self.list(range(10))
1048 self.assertEqual(a[:], range(10))
1049
1050 b = self.list()
1051 self.assertEqual(b[:], [])
1052
1053 b.extend(range(5))
1054 self.assertEqual(b[:], range(5))
1055
1056 self.assertEqual(b[2], 2)
1057 self.assertEqual(b[2:10], [2,3,4])
1058
1059 b *= 2
1060 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1061
1062 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1063
1064 self.assertEqual(a[:], range(10))
1065
1066 d = [a, b]
1067 e = self.list(d)
1068 self.assertEqual(
1069 e[:],
1070 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1071 )
1072
1073 f = self.list([a])
1074 a.append('hello')
1075 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1076
1077 def test_dict(self):
1078 d = self.dict()
1079 indices = range(65, 70)
1080 for i in indices:
1081 d[i] = chr(i)
1082 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1083 self.assertEqual(sorted(d.keys()), indices)
1084 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1085 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1086
1087 def test_namespace(self):
1088 n = self.Namespace()
1089 n.name = 'Bob'
1090 n.job = 'Builder'
1091 n._hidden = 'hidden'
1092 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1093 del n.job
1094 self.assertEqual(str(n), "Namespace(name='Bob')")
1095 self.assertTrue(hasattr(n, 'name'))
1096 self.assertTrue(not hasattr(n, 'job'))
1097
1098#
1099#
1100#
1101
1102def sqr(x, wait=0.0):
1103 time.sleep(wait)
1104 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001105class _TestPool(BaseTestCase):
1106
1107 def test_apply(self):
1108 papply = self.pool.apply
1109 self.assertEqual(papply(sqr, (5,)), sqr(5))
1110 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1111
1112 def test_map(self):
1113 pmap = self.pool.map
1114 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1115 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1116 map(sqr, range(100)))
1117
Richard Oudkerk21aad972013-10-28 23:02:22 +00001118 def test_map_unplicklable(self):
1119 # Issue #19425 -- failure to pickle should not cause a hang
1120 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001121 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001122 class A(object):
1123 def __reduce__(self):
1124 raise RuntimeError('cannot pickle')
1125 with self.assertRaises(RuntimeError):
1126 self.pool.map(sqr, [A()]*10)
1127
Jesse Noller7530e472009-07-16 14:23:04 +00001128 def test_map_chunksize(self):
1129 try:
1130 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1131 except multiprocessing.TimeoutError:
1132 self.fail("pool.map_async with chunksize stalled on null list")
1133
Benjamin Petersondfd79492008-06-13 19:13:39 +00001134 def test_async(self):
1135 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1136 get = TimingWrapper(res.get)
1137 self.assertEqual(get(), 49)
1138 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1139
1140 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001141 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001142 get = TimingWrapper(res.get)
1143 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1144 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1145
1146 def test_imap(self):
1147 it = self.pool.imap(sqr, range(10))
1148 self.assertEqual(list(it), map(sqr, range(10)))
1149
1150 it = self.pool.imap(sqr, range(10))
1151 for i in range(10):
1152 self.assertEqual(it.next(), i*i)
1153 self.assertRaises(StopIteration, it.next)
1154
1155 it = self.pool.imap(sqr, range(1000), chunksize=100)
1156 for i in range(1000):
1157 self.assertEqual(it.next(), i*i)
1158 self.assertRaises(StopIteration, it.next)
1159
1160 def test_imap_unordered(self):
1161 it = self.pool.imap_unordered(sqr, range(1000))
1162 self.assertEqual(sorted(it), map(sqr, range(1000)))
1163
1164 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1165 self.assertEqual(sorted(it), map(sqr, range(1000)))
1166
1167 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001168 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1169 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1170
Benjamin Petersondfd79492008-06-13 19:13:39 +00001171 p = multiprocessing.Pool(3)
1172 self.assertEqual(3, len(p._pool))
1173 p.close()
1174 p.join()
1175
1176 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001177 p = self.Pool(4)
1178 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001179 time.sleep, [0.1 for i in range(10000)], chunksize=1
1180 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001181 p.terminate()
1182 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001183 join()
1184 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001185
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001186 def test_empty_iterable(self):
1187 # See Issue 12157
1188 p = self.Pool(1)
1189
1190 self.assertEqual(p.map(sqr, []), [])
1191 self.assertEqual(list(p.imap(sqr, [])), [])
1192 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1193 self.assertEqual(p.map_async(sqr, []).get(), [])
1194
1195 p.close()
1196 p.join()
1197
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001198def unpickleable_result():
1199 return lambda: 42
1200
1201class _TestPoolWorkerErrors(BaseTestCase):
1202 ALLOWED_TYPES = ('processes', )
1203
1204 def test_unpickleable_result(self):
1205 from multiprocessing.pool import MaybeEncodingError
1206 p = multiprocessing.Pool(2)
1207
1208 # Make sure we don't lose pool processes because of encoding errors.
1209 for iteration in range(20):
1210 res = p.apply_async(unpickleable_result)
1211 self.assertRaises(MaybeEncodingError, res.get)
1212
1213 p.close()
1214 p.join()
1215
Jesse Noller654ade32010-01-27 03:05:57 +00001216class _TestPoolWorkerLifetime(BaseTestCase):
1217
1218 ALLOWED_TYPES = ('processes', )
1219 def test_pool_worker_lifetime(self):
1220 p = multiprocessing.Pool(3, maxtasksperchild=10)
1221 self.assertEqual(3, len(p._pool))
1222 origworkerpids = [w.pid for w in p._pool]
1223 # Run many tasks so each worker gets replaced (hopefully)
1224 results = []
1225 for i in range(100):
1226 results.append(p.apply_async(sqr, (i, )))
1227 # Fetch the results and verify we got the right answers,
1228 # also ensuring all the tasks have completed.
1229 for (j, res) in enumerate(results):
1230 self.assertEqual(res.get(), sqr(j))
1231 # Refill the pool
1232 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001233 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001234 # (countdown * DELTA = 5 seconds max startup process time)
1235 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001236 while countdown and not all(w.is_alive() for w in p._pool):
1237 countdown -= 1
1238 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001239 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001240 # All pids should be assigned. See issue #7805.
1241 self.assertNotIn(None, origworkerpids)
1242 self.assertNotIn(None, finalworkerpids)
1243 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001244 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1245 p.close()
1246 p.join()
1247
Charles-François Natali46f990e2011-10-24 18:43:51 +02001248 def test_pool_worker_lifetime_early_close(self):
1249 # Issue #10332: closing a pool whose workers have limited lifetimes
1250 # before all the tasks completed would make join() hang.
1251 p = multiprocessing.Pool(3, maxtasksperchild=1)
1252 results = []
1253 for i in range(6):
1254 results.append(p.apply_async(sqr, (i, 0.3)))
1255 p.close()
1256 p.join()
1257 # check the results
1258 for (j, res) in enumerate(results):
1259 self.assertEqual(res.get(), sqr(j))
1260
1261
Benjamin Petersondfd79492008-06-13 19:13:39 +00001262#
1263# Test that manager has expected number of shared objects left
1264#
1265
1266class _TestZZZNumberOfObjects(BaseTestCase):
1267 # Because test cases are sorted alphabetically, this one will get
1268 # run after all the other tests for the manager. It tests that
1269 # there have been no "reference leaks" for the manager's shared
1270 # objects. Note the comment in _TestPool.test_terminate().
1271 ALLOWED_TYPES = ('manager',)
1272
1273 def test_number_of_objects(self):
1274 EXPECTED_NUMBER = 1 # the pool object is still alive
1275 multiprocessing.active_children() # discard dead process objs
1276 gc.collect() # do garbage collection
1277 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001278 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001279 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001280 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001281 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001282
1283 self.assertEqual(refs, EXPECTED_NUMBER)
1284
1285#
1286# Test of creating a customized manager class
1287#
1288
1289from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1290
1291class FooBar(object):
1292 def f(self):
1293 return 'f()'
1294 def g(self):
1295 raise ValueError
1296 def _h(self):
1297 return '_h()'
1298
1299def baz():
1300 for i in xrange(10):
1301 yield i*i
1302
1303class IteratorProxy(BaseProxy):
1304 _exposed_ = ('next', '__next__')
1305 def __iter__(self):
1306 return self
1307 def next(self):
1308 return self._callmethod('next')
1309 def __next__(self):
1310 return self._callmethod('__next__')
1311
1312class MyManager(BaseManager):
1313 pass
1314
1315MyManager.register('Foo', callable=FooBar)
1316MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1317MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1318
1319
1320class _TestMyManager(BaseTestCase):
1321
1322 ALLOWED_TYPES = ('manager',)
1323
1324 def test_mymanager(self):
1325 manager = MyManager()
1326 manager.start()
1327
1328 foo = manager.Foo()
1329 bar = manager.Bar()
1330 baz = manager.baz()
1331
1332 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1333 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1334
1335 self.assertEqual(foo_methods, ['f', 'g'])
1336 self.assertEqual(bar_methods, ['f', '_h'])
1337
1338 self.assertEqual(foo.f(), 'f()')
1339 self.assertRaises(ValueError, foo.g)
1340 self.assertEqual(foo._callmethod('f'), 'f()')
1341 self.assertRaises(RemoteError, foo._callmethod, '_h')
1342
1343 self.assertEqual(bar.f(), 'f()')
1344 self.assertEqual(bar._h(), '_h()')
1345 self.assertEqual(bar._callmethod('f'), 'f()')
1346 self.assertEqual(bar._callmethod('_h'), '_h()')
1347
1348 self.assertEqual(list(baz), [i*i for i in range(10)])
1349
1350 manager.shutdown()
1351
1352#
1353# Test of connecting to a remote server and using xmlrpclib for serialization
1354#
1355
1356_queue = Queue.Queue()
1357def get_queue():
1358 return _queue
1359
1360class QueueManager(BaseManager):
1361 '''manager class used by server process'''
1362QueueManager.register('get_queue', callable=get_queue)
1363
1364class QueueManager2(BaseManager):
1365 '''manager class which specifies the same interface as QueueManager'''
1366QueueManager2.register('get_queue')
1367
1368
1369SERIALIZER = 'xmlrpclib'
1370
1371class _TestRemoteManager(BaseTestCase):
1372
1373 ALLOWED_TYPES = ('manager',)
1374
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001375 @classmethod
1376 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001377 manager = QueueManager2(
1378 address=address, authkey=authkey, serializer=SERIALIZER
1379 )
1380 manager.connect()
1381 queue = manager.get_queue()
1382 queue.put(('hello world', None, True, 2.25))
1383
1384 def test_remote(self):
1385 authkey = os.urandom(32)
1386
1387 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001388 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001389 )
1390 manager.start()
1391
1392 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001393 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001394 p.start()
1395
1396 manager2 = QueueManager2(
1397 address=manager.address, authkey=authkey, serializer=SERIALIZER
1398 )
1399 manager2.connect()
1400 queue = manager2.get_queue()
1401
1402 # Note that xmlrpclib will deserialize object as a list not a tuple
1403 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1404
1405 # Because we are using xmlrpclib for serialization instead of
1406 # pickle this will cause a serialization error.
1407 self.assertRaises(Exception, queue.put, time.sleep)
1408
1409 # Make queue finalizer run before the server is stopped
1410 del queue
1411 manager.shutdown()
1412
Jesse Noller459a6482009-03-30 15:50:42 +00001413class _TestManagerRestart(BaseTestCase):
1414
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001415 @classmethod
1416 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001417 manager = QueueManager(
1418 address=address, authkey=authkey, serializer=SERIALIZER)
1419 manager.connect()
1420 queue = manager.get_queue()
1421 queue.put('hello world')
1422
1423 def test_rapid_restart(self):
1424 authkey = os.urandom(32)
1425 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001426 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001427 srvr = manager.get_server()
1428 addr = srvr.address
1429 # Close the connection.Listener socket which gets opened as a part
1430 # of manager.get_server(). It's not needed for the test.
1431 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001432 manager.start()
1433
1434 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001435 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001436 p.start()
1437 queue = manager.get_queue()
1438 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001439 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001440 manager.shutdown()
1441 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001442 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001443 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001444 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001445
Benjamin Petersondfd79492008-06-13 19:13:39 +00001446#
1447#
1448#
1449
1450SENTINEL = latin('')
1451
1452class _TestConnection(BaseTestCase):
1453
1454 ALLOWED_TYPES = ('processes', 'threads')
1455
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001456 @classmethod
1457 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001458 for msg in iter(conn.recv_bytes, SENTINEL):
1459 conn.send_bytes(msg)
1460 conn.close()
1461
1462 def test_connection(self):
1463 conn, child_conn = self.Pipe()
1464
1465 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001466 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001467 p.start()
1468
1469 seq = [1, 2.25, None]
1470 msg = latin('hello world')
1471 longmsg = msg * 10
1472 arr = array.array('i', range(4))
1473
1474 if self.TYPE == 'processes':
1475 self.assertEqual(type(conn.fileno()), int)
1476
1477 self.assertEqual(conn.send(seq), None)
1478 self.assertEqual(conn.recv(), seq)
1479
1480 self.assertEqual(conn.send_bytes(msg), None)
1481 self.assertEqual(conn.recv_bytes(), msg)
1482
1483 if self.TYPE == 'processes':
1484 buffer = array.array('i', [0]*10)
1485 expected = list(arr) + [0] * (10 - len(arr))
1486 self.assertEqual(conn.send_bytes(arr), None)
1487 self.assertEqual(conn.recv_bytes_into(buffer),
1488 len(arr) * buffer.itemsize)
1489 self.assertEqual(list(buffer), expected)
1490
1491 buffer = array.array('i', [0]*10)
1492 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1493 self.assertEqual(conn.send_bytes(arr), None)
1494 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1495 len(arr) * buffer.itemsize)
1496 self.assertEqual(list(buffer), expected)
1497
1498 buffer = bytearray(latin(' ' * 40))
1499 self.assertEqual(conn.send_bytes(longmsg), None)
1500 try:
1501 res = conn.recv_bytes_into(buffer)
1502 except multiprocessing.BufferTooShort, e:
1503 self.assertEqual(e.args, (longmsg,))
1504 else:
1505 self.fail('expected BufferTooShort, got %s' % res)
1506
1507 poll = TimingWrapper(conn.poll)
1508
1509 self.assertEqual(poll(), False)
1510 self.assertTimingAlmostEqual(poll.elapsed, 0)
1511
1512 self.assertEqual(poll(TIMEOUT1), False)
1513 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1514
1515 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001516 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001517
1518 self.assertEqual(poll(TIMEOUT1), True)
1519 self.assertTimingAlmostEqual(poll.elapsed, 0)
1520
1521 self.assertEqual(conn.recv(), None)
1522
1523 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1524 conn.send_bytes(really_big_msg)
1525 self.assertEqual(conn.recv_bytes(), really_big_msg)
1526
1527 conn.send_bytes(SENTINEL) # tell child to quit
1528 child_conn.close()
1529
1530 if self.TYPE == 'processes':
1531 self.assertEqual(conn.readable, True)
1532 self.assertEqual(conn.writable, True)
1533 self.assertRaises(EOFError, conn.recv)
1534 self.assertRaises(EOFError, conn.recv_bytes)
1535
1536 p.join()
1537
1538 def test_duplex_false(self):
1539 reader, writer = self.Pipe(duplex=False)
1540 self.assertEqual(writer.send(1), None)
1541 self.assertEqual(reader.recv(), 1)
1542 if self.TYPE == 'processes':
1543 self.assertEqual(reader.readable, True)
1544 self.assertEqual(reader.writable, False)
1545 self.assertEqual(writer.readable, False)
1546 self.assertEqual(writer.writable, True)
1547 self.assertRaises(IOError, reader.send, 2)
1548 self.assertRaises(IOError, writer.recv)
1549 self.assertRaises(IOError, writer.poll)
1550
1551 def test_spawn_close(self):
1552 # We test that a pipe connection can be closed by parent
1553 # process immediately after child is spawned. On Windows this
1554 # would have sometimes failed on old versions because
1555 # child_conn would be closed before the child got a chance to
1556 # duplicate it.
1557 conn, child_conn = self.Pipe()
1558
1559 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001560 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001561 p.start()
1562 child_conn.close() # this might complete before child initializes
1563
1564 msg = latin('hello')
1565 conn.send_bytes(msg)
1566 self.assertEqual(conn.recv_bytes(), msg)
1567
1568 conn.send_bytes(SENTINEL)
1569 conn.close()
1570 p.join()
1571
1572 def test_sendbytes(self):
1573 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001574 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001575
1576 msg = latin('abcdefghijklmnopqrstuvwxyz')
1577 a, b = self.Pipe()
1578
1579 a.send_bytes(msg)
1580 self.assertEqual(b.recv_bytes(), msg)
1581
1582 a.send_bytes(msg, 5)
1583 self.assertEqual(b.recv_bytes(), msg[5:])
1584
1585 a.send_bytes(msg, 7, 8)
1586 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1587
1588 a.send_bytes(msg, 26)
1589 self.assertEqual(b.recv_bytes(), latin(''))
1590
1591 a.send_bytes(msg, 26, 0)
1592 self.assertEqual(b.recv_bytes(), latin(''))
1593
1594 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1595
1596 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1597
1598 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1599
1600 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1601
1602 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1603
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001604 @classmethod
1605 def _is_fd_assigned(cls, fd):
1606 try:
1607 os.fstat(fd)
1608 except OSError as e:
1609 if e.errno == errno.EBADF:
1610 return False
1611 raise
1612 else:
1613 return True
1614
1615 @classmethod
1616 def _writefd(cls, conn, data, create_dummy_fds=False):
1617 if create_dummy_fds:
1618 for i in range(0, 256):
1619 if not cls._is_fd_assigned(i):
1620 os.dup2(conn.fileno(), i)
1621 fd = reduction.recv_handle(conn)
1622 if msvcrt:
1623 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1624 os.write(fd, data)
1625 os.close(fd)
1626
Charles-François Natalif8413b22011-09-21 18:44:49 +02001627 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001628 def test_fd_transfer(self):
1629 if self.TYPE != 'processes':
1630 self.skipTest("only makes sense with processes")
1631 conn, child_conn = self.Pipe(duplex=True)
1632
1633 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001634 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001635 p.start()
1636 with open(test_support.TESTFN, "wb") as f:
1637 fd = f.fileno()
1638 if msvcrt:
1639 fd = msvcrt.get_osfhandle(fd)
1640 reduction.send_handle(conn, fd, p.pid)
1641 p.join()
1642 with open(test_support.TESTFN, "rb") as f:
1643 self.assertEqual(f.read(), b"foo")
1644
Charles-François Natalif8413b22011-09-21 18:44:49 +02001645 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001646 @unittest.skipIf(sys.platform == "win32",
1647 "test semantics don't make sense on Windows")
1648 @unittest.skipIf(MAXFD <= 256,
1649 "largest assignable fd number is too small")
1650 @unittest.skipUnless(hasattr(os, "dup2"),
1651 "test needs os.dup2()")
1652 def test_large_fd_transfer(self):
1653 # With fd > 256 (issue #11657)
1654 if self.TYPE != 'processes':
1655 self.skipTest("only makes sense with processes")
1656 conn, child_conn = self.Pipe(duplex=True)
1657
1658 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001659 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001660 p.start()
1661 with open(test_support.TESTFN, "wb") as f:
1662 fd = f.fileno()
1663 for newfd in range(256, MAXFD):
1664 if not self._is_fd_assigned(newfd):
1665 break
1666 else:
1667 self.fail("could not find an unassigned large file descriptor")
1668 os.dup2(fd, newfd)
1669 try:
1670 reduction.send_handle(conn, newfd, p.pid)
1671 finally:
1672 os.close(newfd)
1673 p.join()
1674 with open(test_support.TESTFN, "rb") as f:
1675 self.assertEqual(f.read(), b"bar")
1676
Jesus Ceac23484b2011-09-21 03:47:39 +02001677 @classmethod
1678 def _send_data_without_fd(self, conn):
1679 os.write(conn.fileno(), b"\0")
1680
Charles-François Natalif8413b22011-09-21 18:44:49 +02001681 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001682 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1683 def test_missing_fd_transfer(self):
1684 # Check that exception is raised when received data is not
1685 # accompanied by a file descriptor in ancillary data.
1686 if self.TYPE != 'processes':
1687 self.skipTest("only makes sense with processes")
1688 conn, child_conn = self.Pipe(duplex=True)
1689
1690 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1691 p.daemon = True
1692 p.start()
1693 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1694 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001695
Benjamin Petersondfd79492008-06-13 19:13:39 +00001696class _TestListenerClient(BaseTestCase):
1697
1698 ALLOWED_TYPES = ('processes', 'threads')
1699
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001700 @classmethod
1701 def _test(cls, address):
1702 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001703 conn.send('hello')
1704 conn.close()
1705
1706 def test_listener_client(self):
1707 for family in self.connection.families:
1708 l = self.connection.Listener(family=family)
1709 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001710 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001711 p.start()
1712 conn = l.accept()
1713 self.assertEqual(conn.recv(), 'hello')
1714 p.join()
1715 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001716
1717 def test_issue14725(self):
1718 l = self.connection.Listener()
1719 p = self.Process(target=self._test, args=(l.address,))
1720 p.daemon = True
1721 p.start()
1722 time.sleep(1)
1723 # On Windows the client process should by now have connected,
1724 # written data and closed the pipe handle by now. This causes
1725 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1726 # 14725.
1727 conn = l.accept()
1728 self.assertEqual(conn.recv(), 'hello')
1729 conn.close()
1730 p.join()
1731 l.close()
1732
Benjamin Petersondfd79492008-06-13 19:13:39 +00001733#
1734# Test of sending connection and socket objects between processes
1735#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001736"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001737class _TestPicklingConnections(BaseTestCase):
1738
1739 ALLOWED_TYPES = ('processes',)
1740
1741 def _listener(self, conn, families):
1742 for fam in families:
1743 l = self.connection.Listener(family=fam)
1744 conn.send(l.address)
1745 new_conn = l.accept()
1746 conn.send(new_conn)
1747
1748 if self.TYPE == 'processes':
1749 l = socket.socket()
1750 l.bind(('localhost', 0))
1751 conn.send(l.getsockname())
1752 l.listen(1)
1753 new_conn, addr = l.accept()
1754 conn.send(new_conn)
1755
1756 conn.recv()
1757
1758 def _remote(self, conn):
1759 for (address, msg) in iter(conn.recv, None):
1760 client = self.connection.Client(address)
1761 client.send(msg.upper())
1762 client.close()
1763
1764 if self.TYPE == 'processes':
1765 address, msg = conn.recv()
1766 client = socket.socket()
1767 client.connect(address)
1768 client.sendall(msg.upper())
1769 client.close()
1770
1771 conn.close()
1772
1773 def test_pickling(self):
1774 try:
1775 multiprocessing.allow_connection_pickling()
1776 except ImportError:
1777 return
1778
1779 families = self.connection.families
1780
1781 lconn, lconn0 = self.Pipe()
1782 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001783 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001784 lp.start()
1785 lconn0.close()
1786
1787 rconn, rconn0 = self.Pipe()
1788 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001789 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001790 rp.start()
1791 rconn0.close()
1792
1793 for fam in families:
1794 msg = ('This connection uses family %s' % fam).encode('ascii')
1795 address = lconn.recv()
1796 rconn.send((address, msg))
1797 new_conn = lconn.recv()
1798 self.assertEqual(new_conn.recv(), msg.upper())
1799
1800 rconn.send(None)
1801
1802 if self.TYPE == 'processes':
1803 msg = latin('This connection uses a normal socket')
1804 address = lconn.recv()
1805 rconn.send((address, msg))
1806 if hasattr(socket, 'fromfd'):
1807 new_conn = lconn.recv()
1808 self.assertEqual(new_conn.recv(100), msg.upper())
1809 else:
1810 # XXX On Windows with Py2.6 need to backport fromfd()
1811 discard = lconn.recv_bytes()
1812
1813 lconn.send(None)
1814
1815 rconn.close()
1816 lconn.close()
1817
1818 lp.join()
1819 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001820"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001821#
1822#
1823#
1824
1825class _TestHeap(BaseTestCase):
1826
1827 ALLOWED_TYPES = ('processes',)
1828
1829 def test_heap(self):
1830 iterations = 5000
1831 maxblocks = 50
1832 blocks = []
1833
1834 # create and destroy lots of blocks of different sizes
1835 for i in xrange(iterations):
1836 size = int(random.lognormvariate(0, 1) * 1000)
1837 b = multiprocessing.heap.BufferWrapper(size)
1838 blocks.append(b)
1839 if len(blocks) > maxblocks:
1840 i = random.randrange(maxblocks)
1841 del blocks[i]
1842
1843 # get the heap object
1844 heap = multiprocessing.heap.BufferWrapper._heap
1845
1846 # verify the state of the heap
1847 all = []
1848 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001849 heap._lock.acquire()
1850 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001851 for L in heap._len_to_seq.values():
1852 for arena, start, stop in L:
1853 all.append((heap._arenas.index(arena), start, stop,
1854 stop-start, 'free'))
1855 for arena, start, stop in heap._allocated_blocks:
1856 all.append((heap._arenas.index(arena), start, stop,
1857 stop-start, 'occupied'))
1858 occupied += (stop-start)
1859
1860 all.sort()
1861
1862 for i in range(len(all)-1):
1863 (arena, start, stop) = all[i][:3]
1864 (narena, nstart, nstop) = all[i+1][:3]
1865 self.assertTrue((arena != narena and nstart == 0) or
1866 (stop == nstart))
1867
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001868 def test_free_from_gc(self):
1869 # Check that freeing of blocks by the garbage collector doesn't deadlock
1870 # (issue #12352).
1871 # Make sure the GC is enabled, and set lower collection thresholds to
1872 # make collections more frequent (and increase the probability of
1873 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001874 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001875 gc.enable()
1876 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001877 thresholds = gc.get_threshold()
1878 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001879 gc.set_threshold(10)
1880
1881 # perform numerous block allocations, with cyclic references to make
1882 # sure objects are collected asynchronously by the gc
1883 for i in range(5000):
1884 a = multiprocessing.heap.BufferWrapper(1)
1885 b = multiprocessing.heap.BufferWrapper(1)
1886 # circular references
1887 a.buddy = b
1888 b.buddy = a
1889
Benjamin Petersondfd79492008-06-13 19:13:39 +00001890#
1891#
1892#
1893
Benjamin Petersondfd79492008-06-13 19:13:39 +00001894class _Foo(Structure):
1895 _fields_ = [
1896 ('x', c_int),
1897 ('y', c_double)
1898 ]
1899
1900class _TestSharedCTypes(BaseTestCase):
1901
1902 ALLOWED_TYPES = ('processes',)
1903
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001904 def setUp(self):
1905 if not HAS_SHAREDCTYPES:
1906 self.skipTest("requires multiprocessing.sharedctypes")
1907
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001908 @classmethod
1909 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001910 x.value *= 2
1911 y.value *= 2
1912 foo.x *= 2
1913 foo.y *= 2
1914 string.value *= 2
1915 for i in range(len(arr)):
1916 arr[i] *= 2
1917
1918 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001919 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001920 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001921 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001922 arr = self.Array('d', range(10), lock=lock)
1923 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001924 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001925
1926 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001927 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001928 p.start()
1929 p.join()
1930
1931 self.assertEqual(x.value, 14)
1932 self.assertAlmostEqual(y.value, 2.0/3.0)
1933 self.assertEqual(foo.x, 6)
1934 self.assertAlmostEqual(foo.y, 4.0)
1935 for i in range(10):
1936 self.assertAlmostEqual(arr[i], i*2)
1937 self.assertEqual(string.value, latin('hellohello'))
1938
1939 def test_synchronize(self):
1940 self.test_sharedctypes(lock=True)
1941
1942 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001943 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001944 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001945 foo.x = 0
1946 foo.y = 0
1947 self.assertEqual(bar.x, 2)
1948 self.assertAlmostEqual(bar.y, 5.0)
1949
1950#
1951#
1952#
1953
1954class _TestFinalize(BaseTestCase):
1955
1956 ALLOWED_TYPES = ('processes',)
1957
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001958 @classmethod
1959 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001960 class Foo(object):
1961 pass
1962
1963 a = Foo()
1964 util.Finalize(a, conn.send, args=('a',))
1965 del a # triggers callback for a
1966
1967 b = Foo()
1968 close_b = util.Finalize(b, conn.send, args=('b',))
1969 close_b() # triggers callback for b
1970 close_b() # does nothing because callback has already been called
1971 del b # does nothing because callback has already been called
1972
1973 c = Foo()
1974 util.Finalize(c, conn.send, args=('c',))
1975
1976 d10 = Foo()
1977 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1978
1979 d01 = Foo()
1980 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1981 d02 = Foo()
1982 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1983 d03 = Foo()
1984 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1985
1986 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1987
1988 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1989
Ezio Melottic2077b02011-03-16 12:34:31 +02001990 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001991 # garbage collecting locals
1992 util._exit_function()
1993 conn.close()
1994 os._exit(0)
1995
1996 def test_finalize(self):
1997 conn, child_conn = self.Pipe()
1998
1999 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002000 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002001 p.start()
2002 p.join()
2003
2004 result = [obj for obj in iter(conn.recv, 'STOP')]
2005 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2006
2007#
2008# Test that from ... import * works for each module
2009#
2010
2011class _TestImportStar(BaseTestCase):
2012
2013 ALLOWED_TYPES = ('processes',)
2014
2015 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002016 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002017 'multiprocessing', 'multiprocessing.connection',
2018 'multiprocessing.heap', 'multiprocessing.managers',
2019 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002020 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002021 ]
2022
Charles-François Natalif8413b22011-09-21 18:44:49 +02002023 if HAS_REDUCTION:
2024 modules.append('multiprocessing.reduction')
2025
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002026 if c_int is not None:
2027 # This module requires _ctypes
2028 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002029
2030 for name in modules:
2031 __import__(name)
2032 mod = sys.modules[name]
2033
2034 for attr in getattr(mod, '__all__', ()):
2035 self.assertTrue(
2036 hasattr(mod, attr),
2037 '%r does not have attribute %r' % (mod, attr)
2038 )
2039
2040#
2041# Quick test that logging works -- does not test logging output
2042#
2043
2044class _TestLogging(BaseTestCase):
2045
2046 ALLOWED_TYPES = ('processes',)
2047
2048 def test_enable_logging(self):
2049 logger = multiprocessing.get_logger()
2050 logger.setLevel(util.SUBWARNING)
2051 self.assertTrue(logger is not None)
2052 logger.debug('this will not be printed')
2053 logger.info('nor will this')
2054 logger.setLevel(LOG_LEVEL)
2055
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002056 @classmethod
2057 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002058 logger = multiprocessing.get_logger()
2059 conn.send(logger.getEffectiveLevel())
2060
2061 def test_level(self):
2062 LEVEL1 = 32
2063 LEVEL2 = 37
2064
2065 logger = multiprocessing.get_logger()
2066 root_logger = logging.getLogger()
2067 root_level = root_logger.level
2068
2069 reader, writer = multiprocessing.Pipe(duplex=False)
2070
2071 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002072 p = self.Process(target=self._test_level, args=(writer,))
2073 p.daemon = True
2074 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002075 self.assertEqual(LEVEL1, reader.recv())
2076
2077 logger.setLevel(logging.NOTSET)
2078 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002079 p = self.Process(target=self._test_level, args=(writer,))
2080 p.daemon = True
2081 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002082 self.assertEqual(LEVEL2, reader.recv())
2083
2084 root_logger.setLevel(root_level)
2085 logger.setLevel(level=LOG_LEVEL)
2086
Jesse Noller814d02d2009-11-21 14:38:23 +00002087
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002088# class _TestLoggingProcessName(BaseTestCase):
2089#
2090# def handle(self, record):
2091# assert record.processName == multiprocessing.current_process().name
2092# self.__handled = True
2093#
2094# def test_logging(self):
2095# handler = logging.Handler()
2096# handler.handle = self.handle
2097# self.__handled = False
2098# # Bypass getLogger() and side-effects
2099# logger = logging.getLoggerClass()(
2100# 'multiprocessing.test.TestLoggingProcessName')
2101# logger.addHandler(handler)
2102# logger.propagate = False
2103#
2104# logger.warn('foo')
2105# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002106
Benjamin Petersondfd79492008-06-13 19:13:39 +00002107#
Richard Oudkerkba482642013-02-26 12:37:07 +00002108# Check that Process.join() retries if os.waitpid() fails with EINTR
2109#
2110
2111class _TestPollEintr(BaseTestCase):
2112
2113 ALLOWED_TYPES = ('processes',)
2114
2115 @classmethod
2116 def _killer(cls, pid):
2117 time.sleep(0.5)
2118 os.kill(pid, signal.SIGUSR1)
2119
2120 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2121 def test_poll_eintr(self):
2122 got_signal = [False]
2123 def record(*args):
2124 got_signal[0] = True
2125 pid = os.getpid()
2126 oldhandler = signal.signal(signal.SIGUSR1, record)
2127 try:
2128 killer = self.Process(target=self._killer, args=(pid,))
2129 killer.start()
2130 p = self.Process(target=time.sleep, args=(1,))
2131 p.start()
2132 p.join()
2133 self.assertTrue(got_signal[0])
2134 self.assertEqual(p.exitcode, 0)
2135 killer.join()
2136 finally:
2137 signal.signal(signal.SIGUSR1, oldhandler)
2138
2139#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002140# Test to verify handle verification, see issue 3321
2141#
2142
2143class TestInvalidHandle(unittest.TestCase):
2144
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002145 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002146 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002147 conn = _multiprocessing.Connection(44977608)
2148 self.assertRaises(IOError, conn.poll)
2149 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002150
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002151#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002152# Functions used to create test cases from the base ones in this module
2153#
2154
2155def get_attributes(Source, names):
2156 d = {}
2157 for name in names:
2158 obj = getattr(Source, name)
2159 if type(obj) == type(get_attributes):
2160 obj = staticmethod(obj)
2161 d[name] = obj
2162 return d
2163
2164def create_test_cases(Mixin, type):
2165 result = {}
2166 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002167 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002168
2169 for name in glob.keys():
2170 if name.startswith('_Test'):
2171 base = glob[name]
2172 if type in base.ALLOWED_TYPES:
2173 newname = 'With' + Type + name[1:]
2174 class Temp(base, unittest.TestCase, Mixin):
2175 pass
2176 result[newname] = Temp
2177 Temp.__name__ = newname
2178 Temp.__module__ = Mixin.__module__
2179 return result
2180
2181#
2182# Create test cases
2183#
2184
2185class ProcessesMixin(object):
2186 TYPE = 'processes'
2187 Process = multiprocessing.Process
2188 locals().update(get_attributes(multiprocessing, (
2189 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2190 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2191 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002192 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002193 )))
2194
2195testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2196globals().update(testcases_processes)
2197
2198
2199class ManagerMixin(object):
2200 TYPE = 'manager'
2201 Process = multiprocessing.Process
2202 manager = object.__new__(multiprocessing.managers.SyncManager)
2203 locals().update(get_attributes(manager, (
2204 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2205 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002206 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002207 )))
2208
2209testcases_manager = create_test_cases(ManagerMixin, type='manager')
2210globals().update(testcases_manager)
2211
2212
2213class ThreadsMixin(object):
2214 TYPE = 'threads'
2215 Process = multiprocessing.dummy.Process
2216 locals().update(get_attributes(multiprocessing.dummy, (
2217 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2218 'Condition', 'Event', 'Value', 'Array', 'current_process',
2219 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002220 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002221 )))
2222
2223testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2224globals().update(testcases_threads)
2225
Neal Norwitz0c519b32008-08-25 01:50:24 +00002226class OtherTest(unittest.TestCase):
2227 # TODO: add more tests for deliver/answer challenge.
2228 def test_deliver_challenge_auth_failure(self):
2229 class _FakeConnection(object):
2230 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002231 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002232 def send_bytes(self, data):
2233 pass
2234 self.assertRaises(multiprocessing.AuthenticationError,
2235 multiprocessing.connection.deliver_challenge,
2236 _FakeConnection(), b'abc')
2237
2238 def test_answer_challenge_auth_failure(self):
2239 class _FakeConnection(object):
2240 def __init__(self):
2241 self.count = 0
2242 def recv_bytes(self, size):
2243 self.count += 1
2244 if self.count == 1:
2245 return multiprocessing.connection.CHALLENGE
2246 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002247 return b'something bogus'
2248 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002249 def send_bytes(self, data):
2250 pass
2251 self.assertRaises(multiprocessing.AuthenticationError,
2252 multiprocessing.connection.answer_challenge,
2253 _FakeConnection(), b'abc')
2254
Jesse Noller7152f6d2009-04-02 05:17:26 +00002255#
2256# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2257#
2258
2259def initializer(ns):
2260 ns.test += 1
2261
2262class TestInitializers(unittest.TestCase):
2263 def setUp(self):
2264 self.mgr = multiprocessing.Manager()
2265 self.ns = self.mgr.Namespace()
2266 self.ns.test = 0
2267
2268 def tearDown(self):
2269 self.mgr.shutdown()
2270
2271 def test_manager_initializer(self):
2272 m = multiprocessing.managers.SyncManager()
2273 self.assertRaises(TypeError, m.start, 1)
2274 m.start(initializer, (self.ns,))
2275 self.assertEqual(self.ns.test, 1)
2276 m.shutdown()
2277
2278 def test_pool_initializer(self):
2279 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2280 p = multiprocessing.Pool(1, initializer, (self.ns,))
2281 p.close()
2282 p.join()
2283 self.assertEqual(self.ns.test, 1)
2284
Jesse Noller1b90efb2009-06-30 17:11:52 +00002285#
2286# Issue 5155, 5313, 5331: Test process in processes
2287# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2288#
2289
Richard Oudkerkc5496072013-09-29 17:10:40 +01002290def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002291 try:
2292 item = q.get(block=False)
2293 except Queue.Empty:
2294 pass
2295
Richard Oudkerkc5496072013-09-29 17:10:40 +01002296def _test_process(q):
2297 queue = multiprocessing.Queue()
2298 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2299 subProc.daemon = True
2300 subProc.start()
2301 subProc.join()
2302
Jesse Noller1b90efb2009-06-30 17:11:52 +00002303def _afunc(x):
2304 return x*x
2305
2306def pool_in_process():
2307 pool = multiprocessing.Pool(processes=4)
2308 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2309
2310class _file_like(object):
2311 def __init__(self, delegate):
2312 self._delegate = delegate
2313 self._pid = None
2314
2315 @property
2316 def cache(self):
2317 pid = os.getpid()
2318 # There are no race conditions since fork keeps only the running thread
2319 if pid != self._pid:
2320 self._pid = pid
2321 self._cache = []
2322 return self._cache
2323
2324 def write(self, data):
2325 self.cache.append(data)
2326
2327 def flush(self):
2328 self._delegate.write(''.join(self.cache))
2329 self._cache = []
2330
2331class TestStdinBadfiledescriptor(unittest.TestCase):
2332
2333 def test_queue_in_process(self):
2334 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002335 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002336 proc.start()
2337 proc.join()
2338
2339 def test_pool_in_process(self):
2340 p = multiprocessing.Process(target=pool_in_process)
2341 p.start()
2342 p.join()
2343
2344 def test_flushing(self):
2345 sio = StringIO()
2346 flike = _file_like(sio)
2347 flike.write('foo')
2348 proc = multiprocessing.Process(target=lambda: flike.flush())
2349 flike.flush()
2350 assert sio.getvalue() == 'foo'
2351
Richard Oudkerke4b99382012-07-27 14:05:46 +01002352#
2353# Test interaction with socket timeouts - see Issue #6056
2354#
2355
2356class TestTimeouts(unittest.TestCase):
2357 @classmethod
2358 def _test_timeout(cls, child, address):
2359 time.sleep(1)
2360 child.send(123)
2361 child.close()
2362 conn = multiprocessing.connection.Client(address)
2363 conn.send(456)
2364 conn.close()
2365
2366 def test_timeout(self):
2367 old_timeout = socket.getdefaulttimeout()
2368 try:
2369 socket.setdefaulttimeout(0.1)
2370 parent, child = multiprocessing.Pipe(duplex=True)
2371 l = multiprocessing.connection.Listener(family='AF_INET')
2372 p = multiprocessing.Process(target=self._test_timeout,
2373 args=(child, l.address))
2374 p.start()
2375 child.close()
2376 self.assertEqual(parent.recv(), 123)
2377 parent.close()
2378 conn = l.accept()
2379 self.assertEqual(conn.recv(), 456)
2380 conn.close()
2381 l.close()
2382 p.join(10)
2383 finally:
2384 socket.setdefaulttimeout(old_timeout)
2385
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002386#
2387# Test what happens with no "if __name__ == '__main__'"
2388#
2389
2390class TestNoForkBomb(unittest.TestCase):
2391 def test_noforkbomb(self):
2392 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2393 if WIN32:
2394 rc, out, err = test.script_helper.assert_python_failure(name)
2395 self.assertEqual('', out.decode('ascii'))
2396 self.assertIn('RuntimeError', err.decode('ascii'))
2397 else:
2398 rc, out, err = test.script_helper.assert_python_ok(name)
2399 self.assertEqual('123', out.decode('ascii').rstrip())
2400 self.assertEqual('', err.decode('ascii'))
2401
2402#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002403# Issue 12098: check sys.flags of child matches that for parent
2404#
2405
2406class TestFlags(unittest.TestCase):
2407 @classmethod
2408 def run_in_grandchild(cls, conn):
2409 conn.send(tuple(sys.flags))
2410
2411 @classmethod
2412 def run_in_child(cls):
2413 import json
2414 r, w = multiprocessing.Pipe(duplex=False)
2415 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2416 p.start()
2417 grandchild_flags = r.recv()
2418 p.join()
2419 r.close()
2420 w.close()
2421 flags = (tuple(sys.flags), grandchild_flags)
2422 print(json.dumps(flags))
2423
2424 def test_flags(self):
2425 import json, subprocess
2426 # start child process using unusual flags
2427 prog = ('from test.test_multiprocessing import TestFlags; ' +
2428 'TestFlags.run_in_child()')
2429 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002430 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002431 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2432 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002433
2434#
2435# Issue #17555: ForkAwareThreadLock
2436#
2437
2438class TestForkAwareThreadLock(unittest.TestCase):
2439 # We recurisvely start processes. Issue #17555 meant that the
2440 # after fork registry would get duplicate entries for the same
2441 # lock. The size of the registry at generation n was ~2**n.
2442
2443 @classmethod
2444 def child(cls, n, conn):
2445 if n > 1:
2446 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2447 p.start()
2448 p.join()
2449 else:
2450 conn.send(len(util._afterfork_registry))
2451 conn.close()
2452
2453 def test_lock(self):
2454 r, w = multiprocessing.Pipe(False)
2455 l = util.ForkAwareThreadLock()
2456 old_size = len(util._afterfork_registry)
2457 p = multiprocessing.Process(target=self.child, args=(5, w))
2458 p.start()
2459 new_size = r.recv()
2460 p.join()
2461 self.assertLessEqual(new_size, old_size)
2462
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002463#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002464# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2465#
2466
2467class TestIgnoreEINTR(unittest.TestCase):
2468
2469 @classmethod
2470 def _test_ignore(cls, conn):
2471 def handler(signum, frame):
2472 pass
2473 signal.signal(signal.SIGUSR1, handler)
2474 conn.send('ready')
2475 x = conn.recv()
2476 conn.send(x)
2477 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2478
2479 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2480 def test_ignore(self):
2481 conn, child_conn = multiprocessing.Pipe()
2482 try:
2483 p = multiprocessing.Process(target=self._test_ignore,
2484 args=(child_conn,))
2485 p.daemon = True
2486 p.start()
2487 child_conn.close()
2488 self.assertEqual(conn.recv(), 'ready')
2489 time.sleep(0.1)
2490 os.kill(p.pid, signal.SIGUSR1)
2491 time.sleep(0.1)
2492 conn.send(1234)
2493 self.assertEqual(conn.recv(), 1234)
2494 time.sleep(0.1)
2495 os.kill(p.pid, signal.SIGUSR1)
2496 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2497 time.sleep(0.1)
2498 p.join()
2499 finally:
2500 conn.close()
2501
2502 @classmethod
2503 def _test_ignore_listener(cls, conn):
2504 def handler(signum, frame):
2505 pass
2506 signal.signal(signal.SIGUSR1, handler)
2507 l = multiprocessing.connection.Listener()
2508 conn.send(l.address)
2509 a = l.accept()
2510 a.send('welcome')
2511
2512 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2513 def test_ignore_listener(self):
2514 conn, child_conn = multiprocessing.Pipe()
2515 try:
2516 p = multiprocessing.Process(target=self._test_ignore_listener,
2517 args=(child_conn,))
2518 p.daemon = True
2519 p.start()
2520 child_conn.close()
2521 address = conn.recv()
2522 time.sleep(0.1)
2523 os.kill(p.pid, signal.SIGUSR1)
2524 time.sleep(0.1)
2525 client = multiprocessing.connection.Client(address)
2526 self.assertEqual(client.recv(), 'welcome')
2527 p.join()
2528 finally:
2529 conn.close()
2530
2531#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002532#
2533#
2534
Jesse Noller1b90efb2009-06-30 17:11:52 +00002535testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002536 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002537 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002538
Benjamin Petersondfd79492008-06-13 19:13:39 +00002539#
2540#
2541#
2542
2543def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002544 if sys.platform.startswith("linux"):
2545 try:
2546 lock = multiprocessing.RLock()
2547 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002548 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002549
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002550 check_enough_semaphores()
2551
Benjamin Petersondfd79492008-06-13 19:13:39 +00002552 if run is None:
2553 from test.test_support import run_unittest as run
2554
2555 util.get_temp_dir() # creates temp directory for use by all processes
2556
2557 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2558
Jesse Noller146b7ab2008-07-02 16:44:09 +00002559 ProcessesMixin.pool = multiprocessing.Pool(4)
2560 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2561 ManagerMixin.manager.__init__()
2562 ManagerMixin.manager.start()
2563 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002564
2565 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002566 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2567 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002568 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2569 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002570 )
2571
2572 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2573 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002574 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2575 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002576 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002577 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002578 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002579 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2580 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2581 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002582 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002583
Jesse Noller146b7ab2008-07-02 16:44:09 +00002584 ThreadsMixin.pool.terminate()
2585 ProcessesMixin.pool.terminate()
2586 ManagerMixin.pool.terminate()
2587 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002588
Jesse Noller146b7ab2008-07-02 16:44:09 +00002589 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002590
2591def main():
2592 test_main(unittest.TextTestRunner(verbosity=2).run)
2593
2594if __name__ == '__main__':
2595 main()