blob: ec4507ef74ef900edccea5b6f0f2e50282695ed3 [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010017import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
Charles-François Natalif8413b22011-09-21 18:44:49 +020035from multiprocessing import util
36
37try:
38 from multiprocessing import reduction
39 HAS_REDUCTION = True
40except ImportError:
41 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Brian Curtina06e9b82010-10-07 02:27:41 +000043try:
44 from multiprocessing.sharedctypes import Value, copy
45 HAS_SHAREDCTYPES = True
46except ImportError:
47 HAS_SHAREDCTYPES = False
48
Antoine Pitroua1a8da82011-08-23 19:54:20 +020049try:
50 import msvcrt
51except ImportError:
52 msvcrt = None
53
Benjamin Petersondfd79492008-06-13 19:13:39 +000054#
55#
56#
57
Benjamin Petersone79edf52008-07-13 18:34:58 +000058latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000059
Benjamin Petersondfd79492008-06-13 19:13:39 +000060#
61# Constants
62#
63
64LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000065#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000066
67DELTA = 0.1
68CHECK_TIMINGS = False # making true makes tests take a lot longer
69 # and can sometimes cause some non-serious
70 # failures because some calls block a bit
71 # longer than expected
72if CHECK_TIMINGS:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
74else:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
76
77HAVE_GETVALUE = not getattr(_multiprocessing,
78 'HAVE_BROKEN_SEM_GETVALUE', False)
79
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000080WIN32 = (sys.platform == "win32")
81
Antoine Pitroua1a8da82011-08-23 19:54:20 +020082try:
83 MAXFD = os.sysconf("SC_OPEN_MAX")
84except:
85 MAXFD = 256
86
Benjamin Petersondfd79492008-06-13 19:13:39 +000087#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000088# Some tests require ctypes
89#
90
91try:
Nick Coghlan13623662010-04-10 14:24:36 +000092 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000093except ImportError:
94 Structure = object
95 c_int = c_double = None
96
Charles-François Natali6392d7f2011-11-22 18:35:18 +010097
98def check_enough_semaphores():
99 """Check that the system supports enough semaphores to run the test."""
100 # minimum number of semaphores available according to POSIX
101 nsems_min = 256
102 try:
103 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
104 except (AttributeError, ValueError):
105 # sysconf not available or setting not available
106 return
107 if nsems == -1 or nsems >= nsems_min:
108 return
109 raise unittest.SkipTest("The OS doesn't support enough semaphores "
110 "to run the test (required: %d)." % nsems_min)
111
112
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000113#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000114# Creates a wrapper for a function which records the time it takes to finish
115#
116
117class TimingWrapper(object):
118
119 def __init__(self, func):
120 self.func = func
121 self.elapsed = None
122
123 def __call__(self, *args, **kwds):
124 t = time.time()
125 try:
126 return self.func(*args, **kwds)
127 finally:
128 self.elapsed = time.time() - t
129
130#
131# Base class for test cases
132#
133
134class BaseTestCase(object):
135
136 ALLOWED_TYPES = ('processes', 'manager', 'threads')
137
138 def assertTimingAlmostEqual(self, a, b):
139 if CHECK_TIMINGS:
140 self.assertAlmostEqual(a, b, 1)
141
142 def assertReturnsIfImplemented(self, value, func, *args):
143 try:
144 res = func(*args)
145 except NotImplementedError:
146 pass
147 else:
148 return self.assertEqual(value, res)
149
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000150 # For the sanity of Windows users, rather than crashing or freezing in
151 # multiple ways.
152 def __reduce__(self, *args):
153 raise NotImplementedError("shouldn't try to pickle a test case")
154
155 __reduce_ex__ = __reduce__
156
Benjamin Petersondfd79492008-06-13 19:13:39 +0000157#
158# Return the value of a semaphore
159#
160
161def get_value(self):
162 try:
163 return self.get_value()
164 except AttributeError:
165 try:
166 return self._Semaphore__value
167 except AttributeError:
168 try:
169 return self._value
170 except AttributeError:
171 raise NotImplementedError
172
173#
174# Testcases
175#
176
177class _TestProcess(BaseTestCase):
178
179 ALLOWED_TYPES = ('processes', 'threads')
180
181 def test_current(self):
182 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600183 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184
185 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000186 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187
188 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000189 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000190 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000192 self.assertEqual(current.ident, os.getpid())
193 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000195 @classmethod
196 def _test(cls, q, *args, **kwds):
197 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 q.put(args)
199 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000200 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000201 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203 q.put(current.pid)
204
205 def test_process(self):
206 q = self.Queue(1)
207 e = self.Event()
208 args = (q, 1, 2)
209 kwargs = {'hello':23, 'bye':2.54}
210 name = 'SomeProcess'
211 p = self.Process(
212 target=self._test, args=args, kwargs=kwargs, name=name
213 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215 current = self.current_process()
216
217 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000218 self.assertEqual(p.authkey, current.authkey)
219 self.assertEqual(p.is_alive(), False)
220 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000223 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224
225 p.start()
226
Ezio Melotti2623a372010-11-21 13:34:58 +0000227 self.assertEqual(p.exitcode, None)
228 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000229 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000230
Ezio Melotti2623a372010-11-21 13:34:58 +0000231 self.assertEqual(q.get(), args[1:])
232 self.assertEqual(q.get(), kwargs)
233 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000234 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000235 self.assertEqual(q.get(), current.authkey)
236 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000237
238 p.join()
239
Ezio Melotti2623a372010-11-21 13:34:58 +0000240 self.assertEqual(p.exitcode, 0)
241 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000242 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000243
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000244 @classmethod
245 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000246 time.sleep(1000)
247
248 def test_terminate(self):
249 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600250 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000251
252 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000253 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000254 p.start()
255
256 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000258 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000259
260 p.terminate()
261
262 join = TimingWrapper(p.join)
263 self.assertEqual(join(), None)
264 self.assertTimingAlmostEqual(join.elapsed, 0.0)
265
266 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000267 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000268
269 p.join()
270
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000271 # XXX sometimes get p.exitcode == 0 on Windows ...
272 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000273
274 def test_cpu_count(self):
275 try:
276 cpus = multiprocessing.cpu_count()
277 except NotImplementedError:
278 cpus = 1
279 self.assertTrue(type(cpus) is int)
280 self.assertTrue(cpus >= 1)
281
282 def test_active_children(self):
283 self.assertEqual(type(self.active_children()), list)
284
285 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000286 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000287
Jesus Cea6f6016b2011-09-09 20:26:57 +0200288 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000290 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291
292 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000294
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000295 @classmethod
296 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000297 from multiprocessing import forking
298 wconn.send(id)
299 if len(id) < 2:
300 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000301 p = cls.Process(
302 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000303 )
304 p.start()
305 p.join()
306
307 def test_recursion(self):
308 rconn, wconn = self.Pipe(duplex=False)
309 self._test_recursion(wconn, [])
310
311 time.sleep(DELTA)
312 result = []
313 while rconn.poll():
314 result.append(rconn.recv())
315
316 expected = [
317 [],
318 [0],
319 [0, 0],
320 [0, 1],
321 [1],
322 [1, 0],
323 [1, 1]
324 ]
325 self.assertEqual(result, expected)
326
Richard Oudkerk2182e052012-06-06 19:01:14 +0100327 @classmethod
328 def _test_sys_exit(cls, reason, testfn):
329 sys.stderr = open(testfn, 'w')
330 sys.exit(reason)
331
332 def test_sys_exit(self):
333 # See Issue 13854
334 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600335 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100336
337 testfn = test_support.TESTFN
338 self.addCleanup(test_support.unlink, testfn)
339
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000340 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100341 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
342 p.daemon = True
343 p.start()
344 p.join(5)
345 self.assertEqual(p.exitcode, code)
346
347 with open(testfn, 'r') as f:
348 self.assertEqual(f.read().rstrip(), str(reason))
349
350 for reason in (True, False, 8):
351 p = self.Process(target=sys.exit, args=(reason,))
352 p.daemon = True
353 p.start()
354 p.join(5)
355 self.assertEqual(p.exitcode, reason)
356
Benjamin Petersondfd79492008-06-13 19:13:39 +0000357#
358#
359#
360
361class _UpperCaser(multiprocessing.Process):
362
363 def __init__(self):
364 multiprocessing.Process.__init__(self)
365 self.child_conn, self.parent_conn = multiprocessing.Pipe()
366
367 def run(self):
368 self.parent_conn.close()
369 for s in iter(self.child_conn.recv, None):
370 self.child_conn.send(s.upper())
371 self.child_conn.close()
372
373 def submit(self, s):
374 assert type(s) is str
375 self.parent_conn.send(s)
376 return self.parent_conn.recv()
377
378 def stop(self):
379 self.parent_conn.send(None)
380 self.parent_conn.close()
381 self.child_conn.close()
382
383class _TestSubclassingProcess(BaseTestCase):
384
385 ALLOWED_TYPES = ('processes',)
386
387 def test_subclassing(self):
388 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200389 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000390 uppercaser.start()
391 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
392 self.assertEqual(uppercaser.submit('world'), 'WORLD')
393 uppercaser.stop()
394 uppercaser.join()
395
396#
397#
398#
399
400def queue_empty(q):
401 if hasattr(q, 'empty'):
402 return q.empty()
403 else:
404 return q.qsize() == 0
405
406def queue_full(q, maxsize):
407 if hasattr(q, 'full'):
408 return q.full()
409 else:
410 return q.qsize() == maxsize
411
412
413class _TestQueue(BaseTestCase):
414
415
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000416 @classmethod
417 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000418 child_can_start.wait()
419 for i in range(6):
420 queue.get()
421 parent_can_continue.set()
422
423 def test_put(self):
424 MAXSIZE = 6
425 queue = self.Queue(maxsize=MAXSIZE)
426 child_can_start = self.Event()
427 parent_can_continue = self.Event()
428
429 proc = self.Process(
430 target=self._test_put,
431 args=(queue, child_can_start, parent_can_continue)
432 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000433 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000434 proc.start()
435
436 self.assertEqual(queue_empty(queue), True)
437 self.assertEqual(queue_full(queue, MAXSIZE), False)
438
439 queue.put(1)
440 queue.put(2, True)
441 queue.put(3, True, None)
442 queue.put(4, False)
443 queue.put(5, False, None)
444 queue.put_nowait(6)
445
446 # the values may be in buffer but not yet in pipe so sleep a bit
447 time.sleep(DELTA)
448
449 self.assertEqual(queue_empty(queue), False)
450 self.assertEqual(queue_full(queue, MAXSIZE), True)
451
452 put = TimingWrapper(queue.put)
453 put_nowait = TimingWrapper(queue.put_nowait)
454
455 self.assertRaises(Queue.Full, put, 7, False)
456 self.assertTimingAlmostEqual(put.elapsed, 0)
457
458 self.assertRaises(Queue.Full, put, 7, False, None)
459 self.assertTimingAlmostEqual(put.elapsed, 0)
460
461 self.assertRaises(Queue.Full, put_nowait, 7)
462 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
463
464 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
465 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
466
467 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
468 self.assertTimingAlmostEqual(put.elapsed, 0)
469
470 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
471 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
472
473 child_can_start.set()
474 parent_can_continue.wait()
475
476 self.assertEqual(queue_empty(queue), True)
477 self.assertEqual(queue_full(queue, MAXSIZE), False)
478
479 proc.join()
480
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000481 @classmethod
482 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000483 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000484 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 queue.put(2)
486 queue.put(3)
487 queue.put(4)
488 queue.put(5)
489 parent_can_continue.set()
490
491 def test_get(self):
492 queue = self.Queue()
493 child_can_start = self.Event()
494 parent_can_continue = self.Event()
495
496 proc = self.Process(
497 target=self._test_get,
498 args=(queue, child_can_start, parent_can_continue)
499 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000500 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000501 proc.start()
502
503 self.assertEqual(queue_empty(queue), True)
504
505 child_can_start.set()
506 parent_can_continue.wait()
507
508 time.sleep(DELTA)
509 self.assertEqual(queue_empty(queue), False)
510
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000511 # Hangs unexpectedly, remove for now
512 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000513 self.assertEqual(queue.get(True, None), 2)
514 self.assertEqual(queue.get(True), 3)
515 self.assertEqual(queue.get(timeout=1), 4)
516 self.assertEqual(queue.get_nowait(), 5)
517
518 self.assertEqual(queue_empty(queue), True)
519
520 get = TimingWrapper(queue.get)
521 get_nowait = TimingWrapper(queue.get_nowait)
522
523 self.assertRaises(Queue.Empty, get, False)
524 self.assertTimingAlmostEqual(get.elapsed, 0)
525
526 self.assertRaises(Queue.Empty, get, False, None)
527 self.assertTimingAlmostEqual(get.elapsed, 0)
528
529 self.assertRaises(Queue.Empty, get_nowait)
530 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
531
532 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
533 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
534
535 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
536 self.assertTimingAlmostEqual(get.elapsed, 0)
537
538 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
539 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
540
541 proc.join()
542
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000543 @classmethod
544 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000545 for i in range(10, 20):
546 queue.put(i)
547 # note that at this point the items may only be buffered, so the
548 # process cannot shutdown until the feeder thread has finished
549 # pushing items onto the pipe.
550
551 def test_fork(self):
552 # Old versions of Queue would fail to create a new feeder
553 # thread for a forked process if the original process had its
554 # own feeder thread. This test checks that this no longer
555 # happens.
556
557 queue = self.Queue()
558
559 # put items on queue so that main process starts a feeder thread
560 for i in range(10):
561 queue.put(i)
562
563 # wait to make sure thread starts before we fork a new process
564 time.sleep(DELTA)
565
566 # fork process
567 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200568 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000569 p.start()
570
571 # check that all expected items are in the queue
572 for i in range(20):
573 self.assertEqual(queue.get(), i)
574 self.assertRaises(Queue.Empty, queue.get, False)
575
576 p.join()
577
578 def test_qsize(self):
579 q = self.Queue()
580 try:
581 self.assertEqual(q.qsize(), 0)
582 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600583 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584 q.put(1)
585 self.assertEqual(q.qsize(), 1)
586 q.put(5)
587 self.assertEqual(q.qsize(), 2)
588 q.get()
589 self.assertEqual(q.qsize(), 1)
590 q.get()
591 self.assertEqual(q.qsize(), 0)
592
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000593 @classmethod
594 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000595 for obj in iter(q.get, None):
596 time.sleep(DELTA)
597 q.task_done()
598
599 def test_task_done(self):
600 queue = self.JoinableQueue()
601
602 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000603 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000604
605 workers = [self.Process(target=self._test_task_done, args=(queue,))
606 for i in xrange(4)]
607
608 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200609 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000610 p.start()
611
612 for i in xrange(10):
613 queue.put(i)
614
615 queue.join()
616
617 for p in workers:
618 queue.put(None)
619
620 for p in workers:
621 p.join()
622
623#
624#
625#
626
627class _TestLock(BaseTestCase):
628
629 def test_lock(self):
630 lock = self.Lock()
631 self.assertEqual(lock.acquire(), True)
632 self.assertEqual(lock.acquire(False), False)
633 self.assertEqual(lock.release(), None)
634 self.assertRaises((ValueError, threading.ThreadError), lock.release)
635
636 def test_rlock(self):
637 lock = self.RLock()
638 self.assertEqual(lock.acquire(), True)
639 self.assertEqual(lock.acquire(), True)
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.release(), None)
642 self.assertEqual(lock.release(), None)
643 self.assertEqual(lock.release(), None)
644 self.assertRaises((AssertionError, RuntimeError), lock.release)
645
Jesse Noller82eb5902009-03-30 23:29:31 +0000646 def test_lock_context(self):
647 with self.Lock():
648 pass
649
Benjamin Petersondfd79492008-06-13 19:13:39 +0000650
651class _TestSemaphore(BaseTestCase):
652
653 def _test_semaphore(self, sem):
654 self.assertReturnsIfImplemented(2, get_value, sem)
655 self.assertEqual(sem.acquire(), True)
656 self.assertReturnsIfImplemented(1, get_value, sem)
657 self.assertEqual(sem.acquire(), True)
658 self.assertReturnsIfImplemented(0, get_value, sem)
659 self.assertEqual(sem.acquire(False), False)
660 self.assertReturnsIfImplemented(0, get_value, sem)
661 self.assertEqual(sem.release(), None)
662 self.assertReturnsIfImplemented(1, get_value, sem)
663 self.assertEqual(sem.release(), None)
664 self.assertReturnsIfImplemented(2, get_value, sem)
665
666 def test_semaphore(self):
667 sem = self.Semaphore(2)
668 self._test_semaphore(sem)
669 self.assertEqual(sem.release(), None)
670 self.assertReturnsIfImplemented(3, get_value, sem)
671 self.assertEqual(sem.release(), None)
672 self.assertReturnsIfImplemented(4, get_value, sem)
673
674 def test_bounded_semaphore(self):
675 sem = self.BoundedSemaphore(2)
676 self._test_semaphore(sem)
677 # Currently fails on OS/X
678 #if HAVE_GETVALUE:
679 # self.assertRaises(ValueError, sem.release)
680 # self.assertReturnsIfImplemented(2, get_value, sem)
681
682 def test_timeout(self):
683 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600684 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000685
686 sem = self.Semaphore(0)
687 acquire = TimingWrapper(sem.acquire)
688
689 self.assertEqual(acquire(False), False)
690 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
691
692 self.assertEqual(acquire(False, None), False)
693 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
694
695 self.assertEqual(acquire(False, TIMEOUT1), False)
696 self.assertTimingAlmostEqual(acquire.elapsed, 0)
697
698 self.assertEqual(acquire(True, TIMEOUT2), False)
699 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
700
701 self.assertEqual(acquire(timeout=TIMEOUT3), False)
702 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
703
704
705class _TestCondition(BaseTestCase):
706
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000707 @classmethod
708 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000709 cond.acquire()
710 sleeping.release()
711 cond.wait(timeout)
712 woken.release()
713 cond.release()
714
715 def check_invariant(self, cond):
716 # this is only supposed to succeed when there are no sleepers
717 if self.TYPE == 'processes':
718 try:
719 sleepers = (cond._sleeping_count.get_value() -
720 cond._woken_count.get_value())
721 self.assertEqual(sleepers, 0)
722 self.assertEqual(cond._wait_semaphore.get_value(), 0)
723 except NotImplementedError:
724 pass
725
726 def test_notify(self):
727 cond = self.Condition()
728 sleeping = self.Semaphore(0)
729 woken = self.Semaphore(0)
730
731 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000732 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000733 p.start()
734
735 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000736 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000737 p.start()
738
739 # wait for both children to start sleeping
740 sleeping.acquire()
741 sleeping.acquire()
742
743 # check no process/thread has woken up
744 time.sleep(DELTA)
745 self.assertReturnsIfImplemented(0, get_value, woken)
746
747 # wake up one process/thread
748 cond.acquire()
749 cond.notify()
750 cond.release()
751
752 # check one process/thread has woken up
753 time.sleep(DELTA)
754 self.assertReturnsIfImplemented(1, get_value, woken)
755
756 # wake up another
757 cond.acquire()
758 cond.notify()
759 cond.release()
760
761 # check other has woken up
762 time.sleep(DELTA)
763 self.assertReturnsIfImplemented(2, get_value, woken)
764
765 # check state is not mucked up
766 self.check_invariant(cond)
767 p.join()
768
769 def test_notify_all(self):
770 cond = self.Condition()
771 sleeping = self.Semaphore(0)
772 woken = self.Semaphore(0)
773
774 # start some threads/processes which will timeout
775 for i in range(3):
776 p = self.Process(target=self.f,
777 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000778 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000779 p.start()
780
781 t = threading.Thread(target=self.f,
782 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000783 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000784 t.start()
785
786 # wait for them all to sleep
787 for i in xrange(6):
788 sleeping.acquire()
789
790 # check they have all timed out
791 for i in xrange(6):
792 woken.acquire()
793 self.assertReturnsIfImplemented(0, get_value, woken)
794
795 # check state is not mucked up
796 self.check_invariant(cond)
797
798 # start some more threads/processes
799 for i in range(3):
800 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000801 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000802 p.start()
803
804 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000805 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000806 t.start()
807
808 # wait for them to all sleep
809 for i in xrange(6):
810 sleeping.acquire()
811
812 # check no process/thread has woken up
813 time.sleep(DELTA)
814 self.assertReturnsIfImplemented(0, get_value, woken)
815
816 # wake them all up
817 cond.acquire()
818 cond.notify_all()
819 cond.release()
820
821 # check they have all woken
822 time.sleep(DELTA)
823 self.assertReturnsIfImplemented(6, get_value, woken)
824
825 # check state is not mucked up
826 self.check_invariant(cond)
827
828 def test_timeout(self):
829 cond = self.Condition()
830 wait = TimingWrapper(cond.wait)
831 cond.acquire()
832 res = wait(TIMEOUT1)
833 cond.release()
834 self.assertEqual(res, None)
835 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
836
837
838class _TestEvent(BaseTestCase):
839
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000840 @classmethod
841 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842 time.sleep(TIMEOUT2)
843 event.set()
844
845 def test_event(self):
846 event = self.Event()
847 wait = TimingWrapper(event.wait)
848
Ezio Melottic2077b02011-03-16 12:34:31 +0200849 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000850 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000851 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000852
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000853 # Removed, threading.Event.wait() will return the value of the __flag
854 # instead of None. API Shear with the semaphore backed mp.Event
855 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000856 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000857 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
859
860 event.set()
861
862 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000863 self.assertEqual(event.is_set(), True)
864 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000865 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000866 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000867 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
868 # self.assertEqual(event.is_set(), True)
869
870 event.clear()
871
872 #self.assertEqual(event.is_set(), False)
873
Jesus Cea6f6016b2011-09-09 20:26:57 +0200874 p = self.Process(target=self._test_event, args=(event,))
875 p.daemon = True
876 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000877 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000878
879#
880#
881#
882
883class _TestValue(BaseTestCase):
884
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000885 ALLOWED_TYPES = ('processes',)
886
Benjamin Petersondfd79492008-06-13 19:13:39 +0000887 codes_values = [
888 ('i', 4343, 24234),
889 ('d', 3.625, -4.25),
890 ('h', -232, 234),
891 ('c', latin('x'), latin('y'))
892 ]
893
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000894 def setUp(self):
895 if not HAS_SHAREDCTYPES:
896 self.skipTest("requires multiprocessing.sharedctypes")
897
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000898 @classmethod
899 def _test(cls, values):
900 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000901 sv.value = cv[2]
902
903
904 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000905 if raw:
906 values = [self.RawValue(code, value)
907 for code, value, _ in self.codes_values]
908 else:
909 values = [self.Value(code, value)
910 for code, value, _ in self.codes_values]
911
912 for sv, cv in zip(values, self.codes_values):
913 self.assertEqual(sv.value, cv[1])
914
915 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200916 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 proc.start()
918 proc.join()
919
920 for sv, cv in zip(values, self.codes_values):
921 self.assertEqual(sv.value, cv[2])
922
923 def test_rawvalue(self):
924 self.test_value(raw=True)
925
926 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000927 val1 = self.Value('i', 5)
928 lock1 = val1.get_lock()
929 obj1 = val1.get_obj()
930
931 val2 = self.Value('i', 5, lock=None)
932 lock2 = val2.get_lock()
933 obj2 = val2.get_obj()
934
935 lock = self.Lock()
936 val3 = self.Value('i', 5, lock=lock)
937 lock3 = val3.get_lock()
938 obj3 = val3.get_obj()
939 self.assertEqual(lock, lock3)
940
Jesse Noller6ab22152009-01-18 02:45:38 +0000941 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000942 self.assertFalse(hasattr(arr4, 'get_lock'))
943 self.assertFalse(hasattr(arr4, 'get_obj'))
944
Jesse Noller6ab22152009-01-18 02:45:38 +0000945 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
946
947 arr5 = self.RawValue('i', 5)
948 self.assertFalse(hasattr(arr5, 'get_lock'))
949 self.assertFalse(hasattr(arr5, 'get_obj'))
950
Benjamin Petersondfd79492008-06-13 19:13:39 +0000951
952class _TestArray(BaseTestCase):
953
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000954 ALLOWED_TYPES = ('processes',)
955
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000956 @classmethod
957 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000958 for i in range(1, len(seq)):
959 seq[i] += seq[i-1]
960
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000961 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000962 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000963 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
964 if raw:
965 arr = self.RawArray('i', seq)
966 else:
967 arr = self.Array('i', seq)
968
969 self.assertEqual(len(arr), len(seq))
970 self.assertEqual(arr[3], seq[3])
971 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
972
973 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
974
975 self.assertEqual(list(arr[:]), seq)
976
977 self.f(seq)
978
979 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200980 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000981 p.start()
982 p.join()
983
984 self.assertEqual(list(arr[:]), seq)
985
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000986 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000987 def test_array_from_size(self):
988 size = 10
989 # Test for zeroing (see issue #11675).
990 # The repetition below strengthens the test by increasing the chances
991 # of previously allocated non-zero memory being used for the new array
992 # on the 2nd and 3rd loops.
993 for _ in range(3):
994 arr = self.Array('i', size)
995 self.assertEqual(len(arr), size)
996 self.assertEqual(list(arr), [0] * size)
997 arr[:] = range(10)
998 self.assertEqual(list(arr), range(10))
999 del arr
1000
1001 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001002 def test_rawarray(self):
1003 self.test_array(raw=True)
1004
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001005 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001006 def test_array_accepts_long(self):
1007 arr = self.Array('i', 10L)
1008 self.assertEqual(len(arr), 10)
1009 raw_arr = self.RawArray('i', 10L)
1010 self.assertEqual(len(raw_arr), 10)
1011
1012 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001013 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001014 arr1 = self.Array('i', range(10))
1015 lock1 = arr1.get_lock()
1016 obj1 = arr1.get_obj()
1017
1018 arr2 = self.Array('i', range(10), lock=None)
1019 lock2 = arr2.get_lock()
1020 obj2 = arr2.get_obj()
1021
1022 lock = self.Lock()
1023 arr3 = self.Array('i', range(10), lock=lock)
1024 lock3 = arr3.get_lock()
1025 obj3 = arr3.get_obj()
1026 self.assertEqual(lock, lock3)
1027
Jesse Noller6ab22152009-01-18 02:45:38 +00001028 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001029 self.assertFalse(hasattr(arr4, 'get_lock'))
1030 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001031 self.assertRaises(AttributeError,
1032 self.Array, 'i', range(10), lock='notalock')
1033
1034 arr5 = self.RawArray('i', range(10))
1035 self.assertFalse(hasattr(arr5, 'get_lock'))
1036 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001037
1038#
1039#
1040#
1041
1042class _TestContainers(BaseTestCase):
1043
1044 ALLOWED_TYPES = ('manager',)
1045
1046 def test_list(self):
1047 a = self.list(range(10))
1048 self.assertEqual(a[:], range(10))
1049
1050 b = self.list()
1051 self.assertEqual(b[:], [])
1052
1053 b.extend(range(5))
1054 self.assertEqual(b[:], range(5))
1055
1056 self.assertEqual(b[2], 2)
1057 self.assertEqual(b[2:10], [2,3,4])
1058
1059 b *= 2
1060 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1061
1062 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1063
1064 self.assertEqual(a[:], range(10))
1065
1066 d = [a, b]
1067 e = self.list(d)
1068 self.assertEqual(
1069 e[:],
1070 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1071 )
1072
1073 f = self.list([a])
1074 a.append('hello')
1075 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1076
1077 def test_dict(self):
1078 d = self.dict()
1079 indices = range(65, 70)
1080 for i in indices:
1081 d[i] = chr(i)
1082 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1083 self.assertEqual(sorted(d.keys()), indices)
1084 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1085 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1086
1087 def test_namespace(self):
1088 n = self.Namespace()
1089 n.name = 'Bob'
1090 n.job = 'Builder'
1091 n._hidden = 'hidden'
1092 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1093 del n.job
1094 self.assertEqual(str(n), "Namespace(name='Bob')")
1095 self.assertTrue(hasattr(n, 'name'))
1096 self.assertTrue(not hasattr(n, 'job'))
1097
1098#
1099#
1100#
1101
1102def sqr(x, wait=0.0):
1103 time.sleep(wait)
1104 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001105class _TestPool(BaseTestCase):
1106
1107 def test_apply(self):
1108 papply = self.pool.apply
1109 self.assertEqual(papply(sqr, (5,)), sqr(5))
1110 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1111
1112 def test_map(self):
1113 pmap = self.pool.map
1114 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1115 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1116 map(sqr, range(100)))
1117
Richard Oudkerk21aad972013-10-28 23:02:22 +00001118 def test_map_unplicklable(self):
1119 # Issue #19425 -- failure to pickle should not cause a hang
1120 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001121 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001122 class A(object):
1123 def __reduce__(self):
1124 raise RuntimeError('cannot pickle')
1125 with self.assertRaises(RuntimeError):
1126 self.pool.map(sqr, [A()]*10)
1127
Jesse Noller7530e472009-07-16 14:23:04 +00001128 def test_map_chunksize(self):
1129 try:
1130 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1131 except multiprocessing.TimeoutError:
1132 self.fail("pool.map_async with chunksize stalled on null list")
1133
Benjamin Petersondfd79492008-06-13 19:13:39 +00001134 def test_async(self):
1135 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1136 get = TimingWrapper(res.get)
1137 self.assertEqual(get(), 49)
1138 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1139
1140 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001141 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001142 get = TimingWrapper(res.get)
1143 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1144 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1145
1146 def test_imap(self):
1147 it = self.pool.imap(sqr, range(10))
1148 self.assertEqual(list(it), map(sqr, range(10)))
1149
1150 it = self.pool.imap(sqr, range(10))
1151 for i in range(10):
1152 self.assertEqual(it.next(), i*i)
1153 self.assertRaises(StopIteration, it.next)
1154
1155 it = self.pool.imap(sqr, range(1000), chunksize=100)
1156 for i in range(1000):
1157 self.assertEqual(it.next(), i*i)
1158 self.assertRaises(StopIteration, it.next)
1159
1160 def test_imap_unordered(self):
1161 it = self.pool.imap_unordered(sqr, range(1000))
1162 self.assertEqual(sorted(it), map(sqr, range(1000)))
1163
1164 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1165 self.assertEqual(sorted(it), map(sqr, range(1000)))
1166
1167 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001168 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1169 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1170
Benjamin Petersondfd79492008-06-13 19:13:39 +00001171 p = multiprocessing.Pool(3)
1172 self.assertEqual(3, len(p._pool))
1173 p.close()
1174 p.join()
1175
1176 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001177 p = self.Pool(4)
1178 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001179 time.sleep, [0.1 for i in range(10000)], chunksize=1
1180 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001181 p.terminate()
1182 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001183 join()
1184 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001185
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001186 def test_empty_iterable(self):
1187 # See Issue 12157
1188 p = self.Pool(1)
1189
1190 self.assertEqual(p.map(sqr, []), [])
1191 self.assertEqual(list(p.imap(sqr, [])), [])
1192 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1193 self.assertEqual(p.map_async(sqr, []).get(), [])
1194
1195 p.close()
1196 p.join()
1197
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001198def unpickleable_result():
1199 return lambda: 42
1200
1201class _TestPoolWorkerErrors(BaseTestCase):
1202 ALLOWED_TYPES = ('processes', )
1203
1204 def test_unpickleable_result(self):
1205 from multiprocessing.pool import MaybeEncodingError
1206 p = multiprocessing.Pool(2)
1207
1208 # Make sure we don't lose pool processes because of encoding errors.
1209 for iteration in range(20):
1210 res = p.apply_async(unpickleable_result)
1211 self.assertRaises(MaybeEncodingError, res.get)
1212
1213 p.close()
1214 p.join()
1215
Jesse Noller654ade32010-01-27 03:05:57 +00001216class _TestPoolWorkerLifetime(BaseTestCase):
1217
1218 ALLOWED_TYPES = ('processes', )
1219 def test_pool_worker_lifetime(self):
1220 p = multiprocessing.Pool(3, maxtasksperchild=10)
1221 self.assertEqual(3, len(p._pool))
1222 origworkerpids = [w.pid for w in p._pool]
1223 # Run many tasks so each worker gets replaced (hopefully)
1224 results = []
1225 for i in range(100):
1226 results.append(p.apply_async(sqr, (i, )))
1227 # Fetch the results and verify we got the right answers,
1228 # also ensuring all the tasks have completed.
1229 for (j, res) in enumerate(results):
1230 self.assertEqual(res.get(), sqr(j))
1231 # Refill the pool
1232 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001233 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001234 # (countdown * DELTA = 5 seconds max startup process time)
1235 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001236 while countdown and not all(w.is_alive() for w in p._pool):
1237 countdown -= 1
1238 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001239 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001240 # All pids should be assigned. See issue #7805.
1241 self.assertNotIn(None, origworkerpids)
1242 self.assertNotIn(None, finalworkerpids)
1243 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001244 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1245 p.close()
1246 p.join()
1247
Charles-François Natali46f990e2011-10-24 18:43:51 +02001248 def test_pool_worker_lifetime_early_close(self):
1249 # Issue #10332: closing a pool whose workers have limited lifetimes
1250 # before all the tasks completed would make join() hang.
1251 p = multiprocessing.Pool(3, maxtasksperchild=1)
1252 results = []
1253 for i in range(6):
1254 results.append(p.apply_async(sqr, (i, 0.3)))
1255 p.close()
1256 p.join()
1257 # check the results
1258 for (j, res) in enumerate(results):
1259 self.assertEqual(res.get(), sqr(j))
1260
1261
Benjamin Petersondfd79492008-06-13 19:13:39 +00001262#
1263# Test that manager has expected number of shared objects left
1264#
1265
1266class _TestZZZNumberOfObjects(BaseTestCase):
1267 # Because test cases are sorted alphabetically, this one will get
1268 # run after all the other tests for the manager. It tests that
1269 # there have been no "reference leaks" for the manager's shared
1270 # objects. Note the comment in _TestPool.test_terminate().
1271 ALLOWED_TYPES = ('manager',)
1272
1273 def test_number_of_objects(self):
1274 EXPECTED_NUMBER = 1 # the pool object is still alive
1275 multiprocessing.active_children() # discard dead process objs
1276 gc.collect() # do garbage collection
1277 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001278 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001279 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001280 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001281 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001282
1283 self.assertEqual(refs, EXPECTED_NUMBER)
1284
1285#
1286# Test of creating a customized manager class
1287#
1288
1289from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1290
1291class FooBar(object):
1292 def f(self):
1293 return 'f()'
1294 def g(self):
1295 raise ValueError
1296 def _h(self):
1297 return '_h()'
1298
1299def baz():
1300 for i in xrange(10):
1301 yield i*i
1302
1303class IteratorProxy(BaseProxy):
1304 _exposed_ = ('next', '__next__')
1305 def __iter__(self):
1306 return self
1307 def next(self):
1308 return self._callmethod('next')
1309 def __next__(self):
1310 return self._callmethod('__next__')
1311
1312class MyManager(BaseManager):
1313 pass
1314
1315MyManager.register('Foo', callable=FooBar)
1316MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1317MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1318
1319
1320class _TestMyManager(BaseTestCase):
1321
1322 ALLOWED_TYPES = ('manager',)
1323
1324 def test_mymanager(self):
1325 manager = MyManager()
1326 manager.start()
1327
1328 foo = manager.Foo()
1329 bar = manager.Bar()
1330 baz = manager.baz()
1331
1332 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1333 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1334
1335 self.assertEqual(foo_methods, ['f', 'g'])
1336 self.assertEqual(bar_methods, ['f', '_h'])
1337
1338 self.assertEqual(foo.f(), 'f()')
1339 self.assertRaises(ValueError, foo.g)
1340 self.assertEqual(foo._callmethod('f'), 'f()')
1341 self.assertRaises(RemoteError, foo._callmethod, '_h')
1342
1343 self.assertEqual(bar.f(), 'f()')
1344 self.assertEqual(bar._h(), '_h()')
1345 self.assertEqual(bar._callmethod('f'), 'f()')
1346 self.assertEqual(bar._callmethod('_h'), '_h()')
1347
1348 self.assertEqual(list(baz), [i*i for i in range(10)])
1349
1350 manager.shutdown()
1351
1352#
1353# Test of connecting to a remote server and using xmlrpclib for serialization
1354#
1355
1356_queue = Queue.Queue()
1357def get_queue():
1358 return _queue
1359
1360class QueueManager(BaseManager):
1361 '''manager class used by server process'''
1362QueueManager.register('get_queue', callable=get_queue)
1363
1364class QueueManager2(BaseManager):
1365 '''manager class which specifies the same interface as QueueManager'''
1366QueueManager2.register('get_queue')
1367
1368
1369SERIALIZER = 'xmlrpclib'
1370
1371class _TestRemoteManager(BaseTestCase):
1372
1373 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001374 values = ['hello world', None, True, 2.25,
1375 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1376 ]
1377 result = values[:]
1378 if test_support.have_unicode:
1379 #result[-1] = u'hall\xe5 v\xe4rlden'
1380 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1381 r'\u0441\u0432\u0456\u0442')
1382 values.append(uvalue)
1383 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001384
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001385 @classmethod
1386 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001387 manager = QueueManager2(
1388 address=address, authkey=authkey, serializer=SERIALIZER
1389 )
1390 manager.connect()
1391 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001392 # Note that xmlrpclib will deserialize object as a list not a tuple
1393 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001394
1395 def test_remote(self):
1396 authkey = os.urandom(32)
1397
1398 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001399 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001400 )
1401 manager.start()
1402
1403 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001404 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001405 p.start()
1406
1407 manager2 = QueueManager2(
1408 address=manager.address, authkey=authkey, serializer=SERIALIZER
1409 )
1410 manager2.connect()
1411 queue = manager2.get_queue()
1412
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001413 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001414
1415 # Because we are using xmlrpclib for serialization instead of
1416 # pickle this will cause a serialization error.
1417 self.assertRaises(Exception, queue.put, time.sleep)
1418
1419 # Make queue finalizer run before the server is stopped
1420 del queue
1421 manager.shutdown()
1422
Jesse Noller459a6482009-03-30 15:50:42 +00001423class _TestManagerRestart(BaseTestCase):
1424
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001425 @classmethod
1426 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001427 manager = QueueManager(
1428 address=address, authkey=authkey, serializer=SERIALIZER)
1429 manager.connect()
1430 queue = manager.get_queue()
1431 queue.put('hello world')
1432
1433 def test_rapid_restart(self):
1434 authkey = os.urandom(32)
1435 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001436 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001437 srvr = manager.get_server()
1438 addr = srvr.address
1439 # Close the connection.Listener socket which gets opened as a part
1440 # of manager.get_server(). It's not needed for the test.
1441 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001442 manager.start()
1443
1444 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001445 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001446 p.start()
1447 queue = manager.get_queue()
1448 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001449 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001450 manager.shutdown()
1451 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001452 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001453 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001454 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001455
Benjamin Petersondfd79492008-06-13 19:13:39 +00001456#
1457#
1458#
1459
1460SENTINEL = latin('')
1461
1462class _TestConnection(BaseTestCase):
1463
1464 ALLOWED_TYPES = ('processes', 'threads')
1465
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001466 @classmethod
1467 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001468 for msg in iter(conn.recv_bytes, SENTINEL):
1469 conn.send_bytes(msg)
1470 conn.close()
1471
1472 def test_connection(self):
1473 conn, child_conn = self.Pipe()
1474
1475 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001476 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001477 p.start()
1478
1479 seq = [1, 2.25, None]
1480 msg = latin('hello world')
1481 longmsg = msg * 10
1482 arr = array.array('i', range(4))
1483
1484 if self.TYPE == 'processes':
1485 self.assertEqual(type(conn.fileno()), int)
1486
1487 self.assertEqual(conn.send(seq), None)
1488 self.assertEqual(conn.recv(), seq)
1489
1490 self.assertEqual(conn.send_bytes(msg), None)
1491 self.assertEqual(conn.recv_bytes(), msg)
1492
1493 if self.TYPE == 'processes':
1494 buffer = array.array('i', [0]*10)
1495 expected = list(arr) + [0] * (10 - len(arr))
1496 self.assertEqual(conn.send_bytes(arr), None)
1497 self.assertEqual(conn.recv_bytes_into(buffer),
1498 len(arr) * buffer.itemsize)
1499 self.assertEqual(list(buffer), expected)
1500
1501 buffer = array.array('i', [0]*10)
1502 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1503 self.assertEqual(conn.send_bytes(arr), None)
1504 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1505 len(arr) * buffer.itemsize)
1506 self.assertEqual(list(buffer), expected)
1507
1508 buffer = bytearray(latin(' ' * 40))
1509 self.assertEqual(conn.send_bytes(longmsg), None)
1510 try:
1511 res = conn.recv_bytes_into(buffer)
1512 except multiprocessing.BufferTooShort, e:
1513 self.assertEqual(e.args, (longmsg,))
1514 else:
1515 self.fail('expected BufferTooShort, got %s' % res)
1516
1517 poll = TimingWrapper(conn.poll)
1518
1519 self.assertEqual(poll(), False)
1520 self.assertTimingAlmostEqual(poll.elapsed, 0)
1521
1522 self.assertEqual(poll(TIMEOUT1), False)
1523 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1524
1525 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001526 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001527
1528 self.assertEqual(poll(TIMEOUT1), True)
1529 self.assertTimingAlmostEqual(poll.elapsed, 0)
1530
1531 self.assertEqual(conn.recv(), None)
1532
1533 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1534 conn.send_bytes(really_big_msg)
1535 self.assertEqual(conn.recv_bytes(), really_big_msg)
1536
1537 conn.send_bytes(SENTINEL) # tell child to quit
1538 child_conn.close()
1539
1540 if self.TYPE == 'processes':
1541 self.assertEqual(conn.readable, True)
1542 self.assertEqual(conn.writable, True)
1543 self.assertRaises(EOFError, conn.recv)
1544 self.assertRaises(EOFError, conn.recv_bytes)
1545
1546 p.join()
1547
1548 def test_duplex_false(self):
1549 reader, writer = self.Pipe(duplex=False)
1550 self.assertEqual(writer.send(1), None)
1551 self.assertEqual(reader.recv(), 1)
1552 if self.TYPE == 'processes':
1553 self.assertEqual(reader.readable, True)
1554 self.assertEqual(reader.writable, False)
1555 self.assertEqual(writer.readable, False)
1556 self.assertEqual(writer.writable, True)
1557 self.assertRaises(IOError, reader.send, 2)
1558 self.assertRaises(IOError, writer.recv)
1559 self.assertRaises(IOError, writer.poll)
1560
1561 def test_spawn_close(self):
1562 # We test that a pipe connection can be closed by parent
1563 # process immediately after child is spawned. On Windows this
1564 # would have sometimes failed on old versions because
1565 # child_conn would be closed before the child got a chance to
1566 # duplicate it.
1567 conn, child_conn = self.Pipe()
1568
1569 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001570 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001571 p.start()
1572 child_conn.close() # this might complete before child initializes
1573
1574 msg = latin('hello')
1575 conn.send_bytes(msg)
1576 self.assertEqual(conn.recv_bytes(), msg)
1577
1578 conn.send_bytes(SENTINEL)
1579 conn.close()
1580 p.join()
1581
1582 def test_sendbytes(self):
1583 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001584 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001585
1586 msg = latin('abcdefghijklmnopqrstuvwxyz')
1587 a, b = self.Pipe()
1588
1589 a.send_bytes(msg)
1590 self.assertEqual(b.recv_bytes(), msg)
1591
1592 a.send_bytes(msg, 5)
1593 self.assertEqual(b.recv_bytes(), msg[5:])
1594
1595 a.send_bytes(msg, 7, 8)
1596 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1597
1598 a.send_bytes(msg, 26)
1599 self.assertEqual(b.recv_bytes(), latin(''))
1600
1601 a.send_bytes(msg, 26, 0)
1602 self.assertEqual(b.recv_bytes(), latin(''))
1603
1604 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1605
1606 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1607
1608 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1609
1610 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1611
1612 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1613
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001614 @classmethod
1615 def _is_fd_assigned(cls, fd):
1616 try:
1617 os.fstat(fd)
1618 except OSError as e:
1619 if e.errno == errno.EBADF:
1620 return False
1621 raise
1622 else:
1623 return True
1624
1625 @classmethod
1626 def _writefd(cls, conn, data, create_dummy_fds=False):
1627 if create_dummy_fds:
1628 for i in range(0, 256):
1629 if not cls._is_fd_assigned(i):
1630 os.dup2(conn.fileno(), i)
1631 fd = reduction.recv_handle(conn)
1632 if msvcrt:
1633 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1634 os.write(fd, data)
1635 os.close(fd)
1636
Charles-François Natalif8413b22011-09-21 18:44:49 +02001637 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001638 def test_fd_transfer(self):
1639 if self.TYPE != 'processes':
1640 self.skipTest("only makes sense with processes")
1641 conn, child_conn = self.Pipe(duplex=True)
1642
1643 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001644 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001645 p.start()
1646 with open(test_support.TESTFN, "wb") as f:
1647 fd = f.fileno()
1648 if msvcrt:
1649 fd = msvcrt.get_osfhandle(fd)
1650 reduction.send_handle(conn, fd, p.pid)
1651 p.join()
1652 with open(test_support.TESTFN, "rb") as f:
1653 self.assertEqual(f.read(), b"foo")
1654
Charles-François Natalif8413b22011-09-21 18:44:49 +02001655 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001656 @unittest.skipIf(sys.platform == "win32",
1657 "test semantics don't make sense on Windows")
1658 @unittest.skipIf(MAXFD <= 256,
1659 "largest assignable fd number is too small")
1660 @unittest.skipUnless(hasattr(os, "dup2"),
1661 "test needs os.dup2()")
1662 def test_large_fd_transfer(self):
1663 # With fd > 256 (issue #11657)
1664 if self.TYPE != 'processes':
1665 self.skipTest("only makes sense with processes")
1666 conn, child_conn = self.Pipe(duplex=True)
1667
1668 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001669 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001670 p.start()
1671 with open(test_support.TESTFN, "wb") as f:
1672 fd = f.fileno()
1673 for newfd in range(256, MAXFD):
1674 if not self._is_fd_assigned(newfd):
1675 break
1676 else:
1677 self.fail("could not find an unassigned large file descriptor")
1678 os.dup2(fd, newfd)
1679 try:
1680 reduction.send_handle(conn, newfd, p.pid)
1681 finally:
1682 os.close(newfd)
1683 p.join()
1684 with open(test_support.TESTFN, "rb") as f:
1685 self.assertEqual(f.read(), b"bar")
1686
Jesus Ceac23484b2011-09-21 03:47:39 +02001687 @classmethod
1688 def _send_data_without_fd(self, conn):
1689 os.write(conn.fileno(), b"\0")
1690
Charles-François Natalif8413b22011-09-21 18:44:49 +02001691 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001692 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1693 def test_missing_fd_transfer(self):
1694 # Check that exception is raised when received data is not
1695 # accompanied by a file descriptor in ancillary data.
1696 if self.TYPE != 'processes':
1697 self.skipTest("only makes sense with processes")
1698 conn, child_conn = self.Pipe(duplex=True)
1699
1700 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1701 p.daemon = True
1702 p.start()
1703 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1704 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001705
Benjamin Petersondfd79492008-06-13 19:13:39 +00001706class _TestListenerClient(BaseTestCase):
1707
1708 ALLOWED_TYPES = ('processes', 'threads')
1709
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001710 @classmethod
1711 def _test(cls, address):
1712 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001713 conn.send('hello')
1714 conn.close()
1715
1716 def test_listener_client(self):
1717 for family in self.connection.families:
1718 l = self.connection.Listener(family=family)
1719 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001720 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001721 p.start()
1722 conn = l.accept()
1723 self.assertEqual(conn.recv(), 'hello')
1724 p.join()
1725 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001726
1727 def test_issue14725(self):
1728 l = self.connection.Listener()
1729 p = self.Process(target=self._test, args=(l.address,))
1730 p.daemon = True
1731 p.start()
1732 time.sleep(1)
1733 # On Windows the client process should by now have connected,
1734 # written data and closed the pipe handle by now. This causes
1735 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1736 # 14725.
1737 conn = l.accept()
1738 self.assertEqual(conn.recv(), 'hello')
1739 conn.close()
1740 p.join()
1741 l.close()
1742
Benjamin Petersondfd79492008-06-13 19:13:39 +00001743#
1744# Test of sending connection and socket objects between processes
1745#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001746"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001747class _TestPicklingConnections(BaseTestCase):
1748
1749 ALLOWED_TYPES = ('processes',)
1750
1751 def _listener(self, conn, families):
1752 for fam in families:
1753 l = self.connection.Listener(family=fam)
1754 conn.send(l.address)
1755 new_conn = l.accept()
1756 conn.send(new_conn)
1757
1758 if self.TYPE == 'processes':
1759 l = socket.socket()
1760 l.bind(('localhost', 0))
1761 conn.send(l.getsockname())
1762 l.listen(1)
1763 new_conn, addr = l.accept()
1764 conn.send(new_conn)
1765
1766 conn.recv()
1767
1768 def _remote(self, conn):
1769 for (address, msg) in iter(conn.recv, None):
1770 client = self.connection.Client(address)
1771 client.send(msg.upper())
1772 client.close()
1773
1774 if self.TYPE == 'processes':
1775 address, msg = conn.recv()
1776 client = socket.socket()
1777 client.connect(address)
1778 client.sendall(msg.upper())
1779 client.close()
1780
1781 conn.close()
1782
1783 def test_pickling(self):
1784 try:
1785 multiprocessing.allow_connection_pickling()
1786 except ImportError:
1787 return
1788
1789 families = self.connection.families
1790
1791 lconn, lconn0 = self.Pipe()
1792 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001793 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001794 lp.start()
1795 lconn0.close()
1796
1797 rconn, rconn0 = self.Pipe()
1798 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001799 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001800 rp.start()
1801 rconn0.close()
1802
1803 for fam in families:
1804 msg = ('This connection uses family %s' % fam).encode('ascii')
1805 address = lconn.recv()
1806 rconn.send((address, msg))
1807 new_conn = lconn.recv()
1808 self.assertEqual(new_conn.recv(), msg.upper())
1809
1810 rconn.send(None)
1811
1812 if self.TYPE == 'processes':
1813 msg = latin('This connection uses a normal socket')
1814 address = lconn.recv()
1815 rconn.send((address, msg))
1816 if hasattr(socket, 'fromfd'):
1817 new_conn = lconn.recv()
1818 self.assertEqual(new_conn.recv(100), msg.upper())
1819 else:
1820 # XXX On Windows with Py2.6 need to backport fromfd()
1821 discard = lconn.recv_bytes()
1822
1823 lconn.send(None)
1824
1825 rconn.close()
1826 lconn.close()
1827
1828 lp.join()
1829 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001830"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001831#
1832#
1833#
1834
1835class _TestHeap(BaseTestCase):
1836
1837 ALLOWED_TYPES = ('processes',)
1838
1839 def test_heap(self):
1840 iterations = 5000
1841 maxblocks = 50
1842 blocks = []
1843
1844 # create and destroy lots of blocks of different sizes
1845 for i in xrange(iterations):
1846 size = int(random.lognormvariate(0, 1) * 1000)
1847 b = multiprocessing.heap.BufferWrapper(size)
1848 blocks.append(b)
1849 if len(blocks) > maxblocks:
1850 i = random.randrange(maxblocks)
1851 del blocks[i]
1852
1853 # get the heap object
1854 heap = multiprocessing.heap.BufferWrapper._heap
1855
1856 # verify the state of the heap
1857 all = []
1858 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001859 heap._lock.acquire()
1860 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001861 for L in heap._len_to_seq.values():
1862 for arena, start, stop in L:
1863 all.append((heap._arenas.index(arena), start, stop,
1864 stop-start, 'free'))
1865 for arena, start, stop in heap._allocated_blocks:
1866 all.append((heap._arenas.index(arena), start, stop,
1867 stop-start, 'occupied'))
1868 occupied += (stop-start)
1869
1870 all.sort()
1871
1872 for i in range(len(all)-1):
1873 (arena, start, stop) = all[i][:3]
1874 (narena, nstart, nstop) = all[i+1][:3]
1875 self.assertTrue((arena != narena and nstart == 0) or
1876 (stop == nstart))
1877
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001878 def test_free_from_gc(self):
1879 # Check that freeing of blocks by the garbage collector doesn't deadlock
1880 # (issue #12352).
1881 # Make sure the GC is enabled, and set lower collection thresholds to
1882 # make collections more frequent (and increase the probability of
1883 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001884 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001885 gc.enable()
1886 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001887 thresholds = gc.get_threshold()
1888 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001889 gc.set_threshold(10)
1890
1891 # perform numerous block allocations, with cyclic references to make
1892 # sure objects are collected asynchronously by the gc
1893 for i in range(5000):
1894 a = multiprocessing.heap.BufferWrapper(1)
1895 b = multiprocessing.heap.BufferWrapper(1)
1896 # circular references
1897 a.buddy = b
1898 b.buddy = a
1899
Benjamin Petersondfd79492008-06-13 19:13:39 +00001900#
1901#
1902#
1903
Benjamin Petersondfd79492008-06-13 19:13:39 +00001904class _Foo(Structure):
1905 _fields_ = [
1906 ('x', c_int),
1907 ('y', c_double)
1908 ]
1909
1910class _TestSharedCTypes(BaseTestCase):
1911
1912 ALLOWED_TYPES = ('processes',)
1913
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001914 def setUp(self):
1915 if not HAS_SHAREDCTYPES:
1916 self.skipTest("requires multiprocessing.sharedctypes")
1917
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001918 @classmethod
1919 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001920 x.value *= 2
1921 y.value *= 2
1922 foo.x *= 2
1923 foo.y *= 2
1924 string.value *= 2
1925 for i in range(len(arr)):
1926 arr[i] *= 2
1927
1928 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001929 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001930 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001931 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001932 arr = self.Array('d', range(10), lock=lock)
1933 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001934 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001935
1936 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001937 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001938 p.start()
1939 p.join()
1940
1941 self.assertEqual(x.value, 14)
1942 self.assertAlmostEqual(y.value, 2.0/3.0)
1943 self.assertEqual(foo.x, 6)
1944 self.assertAlmostEqual(foo.y, 4.0)
1945 for i in range(10):
1946 self.assertAlmostEqual(arr[i], i*2)
1947 self.assertEqual(string.value, latin('hellohello'))
1948
1949 def test_synchronize(self):
1950 self.test_sharedctypes(lock=True)
1951
1952 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001953 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001954 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001955 foo.x = 0
1956 foo.y = 0
1957 self.assertEqual(bar.x, 2)
1958 self.assertAlmostEqual(bar.y, 5.0)
1959
1960#
1961#
1962#
1963
1964class _TestFinalize(BaseTestCase):
1965
1966 ALLOWED_TYPES = ('processes',)
1967
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001968 @classmethod
1969 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001970 class Foo(object):
1971 pass
1972
1973 a = Foo()
1974 util.Finalize(a, conn.send, args=('a',))
1975 del a # triggers callback for a
1976
1977 b = Foo()
1978 close_b = util.Finalize(b, conn.send, args=('b',))
1979 close_b() # triggers callback for b
1980 close_b() # does nothing because callback has already been called
1981 del b # does nothing because callback has already been called
1982
1983 c = Foo()
1984 util.Finalize(c, conn.send, args=('c',))
1985
1986 d10 = Foo()
1987 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1988
1989 d01 = Foo()
1990 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1991 d02 = Foo()
1992 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1993 d03 = Foo()
1994 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1995
1996 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1997
1998 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1999
Ezio Melottic2077b02011-03-16 12:34:31 +02002000 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002001 # garbage collecting locals
2002 util._exit_function()
2003 conn.close()
2004 os._exit(0)
2005
2006 def test_finalize(self):
2007 conn, child_conn = self.Pipe()
2008
2009 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002010 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002011 p.start()
2012 p.join()
2013
2014 result = [obj for obj in iter(conn.recv, 'STOP')]
2015 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2016
2017#
2018# Test that from ... import * works for each module
2019#
2020
2021class _TestImportStar(BaseTestCase):
2022
2023 ALLOWED_TYPES = ('processes',)
2024
2025 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002026 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002027 'multiprocessing', 'multiprocessing.connection',
2028 'multiprocessing.heap', 'multiprocessing.managers',
2029 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002030 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002031 ]
2032
Charles-François Natalif8413b22011-09-21 18:44:49 +02002033 if HAS_REDUCTION:
2034 modules.append('multiprocessing.reduction')
2035
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002036 if c_int is not None:
2037 # This module requires _ctypes
2038 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002039
2040 for name in modules:
2041 __import__(name)
2042 mod = sys.modules[name]
2043
2044 for attr in getattr(mod, '__all__', ()):
2045 self.assertTrue(
2046 hasattr(mod, attr),
2047 '%r does not have attribute %r' % (mod, attr)
2048 )
2049
2050#
2051# Quick test that logging works -- does not test logging output
2052#
2053
2054class _TestLogging(BaseTestCase):
2055
2056 ALLOWED_TYPES = ('processes',)
2057
2058 def test_enable_logging(self):
2059 logger = multiprocessing.get_logger()
2060 logger.setLevel(util.SUBWARNING)
2061 self.assertTrue(logger is not None)
2062 logger.debug('this will not be printed')
2063 logger.info('nor will this')
2064 logger.setLevel(LOG_LEVEL)
2065
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002066 @classmethod
2067 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002068 logger = multiprocessing.get_logger()
2069 conn.send(logger.getEffectiveLevel())
2070
2071 def test_level(self):
2072 LEVEL1 = 32
2073 LEVEL2 = 37
2074
2075 logger = multiprocessing.get_logger()
2076 root_logger = logging.getLogger()
2077 root_level = root_logger.level
2078
2079 reader, writer = multiprocessing.Pipe(duplex=False)
2080
2081 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002082 p = self.Process(target=self._test_level, args=(writer,))
2083 p.daemon = True
2084 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002085 self.assertEqual(LEVEL1, reader.recv())
2086
2087 logger.setLevel(logging.NOTSET)
2088 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002089 p = self.Process(target=self._test_level, args=(writer,))
2090 p.daemon = True
2091 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002092 self.assertEqual(LEVEL2, reader.recv())
2093
2094 root_logger.setLevel(root_level)
2095 logger.setLevel(level=LOG_LEVEL)
2096
Jesse Noller814d02d2009-11-21 14:38:23 +00002097
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002098# class _TestLoggingProcessName(BaseTestCase):
2099#
2100# def handle(self, record):
2101# assert record.processName == multiprocessing.current_process().name
2102# self.__handled = True
2103#
2104# def test_logging(self):
2105# handler = logging.Handler()
2106# handler.handle = self.handle
2107# self.__handled = False
2108# # Bypass getLogger() and side-effects
2109# logger = logging.getLoggerClass()(
2110# 'multiprocessing.test.TestLoggingProcessName')
2111# logger.addHandler(handler)
2112# logger.propagate = False
2113#
2114# logger.warn('foo')
2115# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002116
Benjamin Petersondfd79492008-06-13 19:13:39 +00002117#
Richard Oudkerkba482642013-02-26 12:37:07 +00002118# Check that Process.join() retries if os.waitpid() fails with EINTR
2119#
2120
2121class _TestPollEintr(BaseTestCase):
2122
2123 ALLOWED_TYPES = ('processes',)
2124
2125 @classmethod
2126 def _killer(cls, pid):
2127 time.sleep(0.5)
2128 os.kill(pid, signal.SIGUSR1)
2129
2130 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2131 def test_poll_eintr(self):
2132 got_signal = [False]
2133 def record(*args):
2134 got_signal[0] = True
2135 pid = os.getpid()
2136 oldhandler = signal.signal(signal.SIGUSR1, record)
2137 try:
2138 killer = self.Process(target=self._killer, args=(pid,))
2139 killer.start()
2140 p = self.Process(target=time.sleep, args=(1,))
2141 p.start()
2142 p.join()
2143 self.assertTrue(got_signal[0])
2144 self.assertEqual(p.exitcode, 0)
2145 killer.join()
2146 finally:
2147 signal.signal(signal.SIGUSR1, oldhandler)
2148
2149#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002150# Test to verify handle verification, see issue 3321
2151#
2152
2153class TestInvalidHandle(unittest.TestCase):
2154
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002155 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002156 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002157 conn = _multiprocessing.Connection(44977608)
2158 self.assertRaises(IOError, conn.poll)
2159 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002160
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002161#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002162# Functions used to create test cases from the base ones in this module
2163#
2164
2165def get_attributes(Source, names):
2166 d = {}
2167 for name in names:
2168 obj = getattr(Source, name)
2169 if type(obj) == type(get_attributes):
2170 obj = staticmethod(obj)
2171 d[name] = obj
2172 return d
2173
2174def create_test_cases(Mixin, type):
2175 result = {}
2176 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002177 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002178
2179 for name in glob.keys():
2180 if name.startswith('_Test'):
2181 base = glob[name]
2182 if type in base.ALLOWED_TYPES:
2183 newname = 'With' + Type + name[1:]
2184 class Temp(base, unittest.TestCase, Mixin):
2185 pass
2186 result[newname] = Temp
2187 Temp.__name__ = newname
2188 Temp.__module__ = Mixin.__module__
2189 return result
2190
2191#
2192# Create test cases
2193#
2194
2195class ProcessesMixin(object):
2196 TYPE = 'processes'
2197 Process = multiprocessing.Process
2198 locals().update(get_attributes(multiprocessing, (
2199 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2200 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2201 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002202 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002203 )))
2204
2205testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2206globals().update(testcases_processes)
2207
2208
2209class ManagerMixin(object):
2210 TYPE = 'manager'
2211 Process = multiprocessing.Process
2212 manager = object.__new__(multiprocessing.managers.SyncManager)
2213 locals().update(get_attributes(manager, (
2214 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2215 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002216 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002217 )))
2218
2219testcases_manager = create_test_cases(ManagerMixin, type='manager')
2220globals().update(testcases_manager)
2221
2222
2223class ThreadsMixin(object):
2224 TYPE = 'threads'
2225 Process = multiprocessing.dummy.Process
2226 locals().update(get_attributes(multiprocessing.dummy, (
2227 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2228 'Condition', 'Event', 'Value', 'Array', 'current_process',
2229 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002230 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002231 )))
2232
2233testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2234globals().update(testcases_threads)
2235
Neal Norwitz0c519b32008-08-25 01:50:24 +00002236class OtherTest(unittest.TestCase):
2237 # TODO: add more tests for deliver/answer challenge.
2238 def test_deliver_challenge_auth_failure(self):
2239 class _FakeConnection(object):
2240 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002241 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002242 def send_bytes(self, data):
2243 pass
2244 self.assertRaises(multiprocessing.AuthenticationError,
2245 multiprocessing.connection.deliver_challenge,
2246 _FakeConnection(), b'abc')
2247
2248 def test_answer_challenge_auth_failure(self):
2249 class _FakeConnection(object):
2250 def __init__(self):
2251 self.count = 0
2252 def recv_bytes(self, size):
2253 self.count += 1
2254 if self.count == 1:
2255 return multiprocessing.connection.CHALLENGE
2256 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002257 return b'something bogus'
2258 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002259 def send_bytes(self, data):
2260 pass
2261 self.assertRaises(multiprocessing.AuthenticationError,
2262 multiprocessing.connection.answer_challenge,
2263 _FakeConnection(), b'abc')
2264
Jesse Noller7152f6d2009-04-02 05:17:26 +00002265#
2266# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2267#
2268
2269def initializer(ns):
2270 ns.test += 1
2271
2272class TestInitializers(unittest.TestCase):
2273 def setUp(self):
2274 self.mgr = multiprocessing.Manager()
2275 self.ns = self.mgr.Namespace()
2276 self.ns.test = 0
2277
2278 def tearDown(self):
2279 self.mgr.shutdown()
2280
2281 def test_manager_initializer(self):
2282 m = multiprocessing.managers.SyncManager()
2283 self.assertRaises(TypeError, m.start, 1)
2284 m.start(initializer, (self.ns,))
2285 self.assertEqual(self.ns.test, 1)
2286 m.shutdown()
2287
2288 def test_pool_initializer(self):
2289 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2290 p = multiprocessing.Pool(1, initializer, (self.ns,))
2291 p.close()
2292 p.join()
2293 self.assertEqual(self.ns.test, 1)
2294
Jesse Noller1b90efb2009-06-30 17:11:52 +00002295#
2296# Issue 5155, 5313, 5331: Test process in processes
2297# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2298#
2299
Richard Oudkerkc5496072013-09-29 17:10:40 +01002300def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002301 try:
2302 item = q.get(block=False)
2303 except Queue.Empty:
2304 pass
2305
Richard Oudkerkc5496072013-09-29 17:10:40 +01002306def _test_process(q):
2307 queue = multiprocessing.Queue()
2308 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2309 subProc.daemon = True
2310 subProc.start()
2311 subProc.join()
2312
Jesse Noller1b90efb2009-06-30 17:11:52 +00002313def _afunc(x):
2314 return x*x
2315
2316def pool_in_process():
2317 pool = multiprocessing.Pool(processes=4)
2318 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2319
2320class _file_like(object):
2321 def __init__(self, delegate):
2322 self._delegate = delegate
2323 self._pid = None
2324
2325 @property
2326 def cache(self):
2327 pid = os.getpid()
2328 # There are no race conditions since fork keeps only the running thread
2329 if pid != self._pid:
2330 self._pid = pid
2331 self._cache = []
2332 return self._cache
2333
2334 def write(self, data):
2335 self.cache.append(data)
2336
2337 def flush(self):
2338 self._delegate.write(''.join(self.cache))
2339 self._cache = []
2340
2341class TestStdinBadfiledescriptor(unittest.TestCase):
2342
2343 def test_queue_in_process(self):
2344 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002345 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002346 proc.start()
2347 proc.join()
2348
2349 def test_pool_in_process(self):
2350 p = multiprocessing.Process(target=pool_in_process)
2351 p.start()
2352 p.join()
2353
2354 def test_flushing(self):
2355 sio = StringIO()
2356 flike = _file_like(sio)
2357 flike.write('foo')
2358 proc = multiprocessing.Process(target=lambda: flike.flush())
2359 flike.flush()
2360 assert sio.getvalue() == 'foo'
2361
Richard Oudkerke4b99382012-07-27 14:05:46 +01002362#
2363# Test interaction with socket timeouts - see Issue #6056
2364#
2365
2366class TestTimeouts(unittest.TestCase):
2367 @classmethod
2368 def _test_timeout(cls, child, address):
2369 time.sleep(1)
2370 child.send(123)
2371 child.close()
2372 conn = multiprocessing.connection.Client(address)
2373 conn.send(456)
2374 conn.close()
2375
2376 def test_timeout(self):
2377 old_timeout = socket.getdefaulttimeout()
2378 try:
2379 socket.setdefaulttimeout(0.1)
2380 parent, child = multiprocessing.Pipe(duplex=True)
2381 l = multiprocessing.connection.Listener(family='AF_INET')
2382 p = multiprocessing.Process(target=self._test_timeout,
2383 args=(child, l.address))
2384 p.start()
2385 child.close()
2386 self.assertEqual(parent.recv(), 123)
2387 parent.close()
2388 conn = l.accept()
2389 self.assertEqual(conn.recv(), 456)
2390 conn.close()
2391 l.close()
2392 p.join(10)
2393 finally:
2394 socket.setdefaulttimeout(old_timeout)
2395
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002396#
2397# Test what happens with no "if __name__ == '__main__'"
2398#
2399
2400class TestNoForkBomb(unittest.TestCase):
2401 def test_noforkbomb(self):
2402 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2403 if WIN32:
2404 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002405 self.assertEqual(out, '')
2406 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002407 else:
2408 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002409 self.assertEqual(out.rstrip(), '123')
2410 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002411
2412#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002413# Issue 12098: check sys.flags of child matches that for parent
2414#
2415
2416class TestFlags(unittest.TestCase):
2417 @classmethod
2418 def run_in_grandchild(cls, conn):
2419 conn.send(tuple(sys.flags))
2420
2421 @classmethod
2422 def run_in_child(cls):
2423 import json
2424 r, w = multiprocessing.Pipe(duplex=False)
2425 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2426 p.start()
2427 grandchild_flags = r.recv()
2428 p.join()
2429 r.close()
2430 w.close()
2431 flags = (tuple(sys.flags), grandchild_flags)
2432 print(json.dumps(flags))
2433
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002434 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002435 def test_flags(self):
2436 import json, subprocess
2437 # start child process using unusual flags
2438 prog = ('from test.test_multiprocessing import TestFlags; ' +
2439 'TestFlags.run_in_child()')
2440 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002441 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002442 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2443 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002444
2445#
2446# Issue #17555: ForkAwareThreadLock
2447#
2448
2449class TestForkAwareThreadLock(unittest.TestCase):
2450 # We recurisvely start processes. Issue #17555 meant that the
2451 # after fork registry would get duplicate entries for the same
2452 # lock. The size of the registry at generation n was ~2**n.
2453
2454 @classmethod
2455 def child(cls, n, conn):
2456 if n > 1:
2457 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2458 p.start()
2459 p.join()
2460 else:
2461 conn.send(len(util._afterfork_registry))
2462 conn.close()
2463
2464 def test_lock(self):
2465 r, w = multiprocessing.Pipe(False)
2466 l = util.ForkAwareThreadLock()
2467 old_size = len(util._afterfork_registry)
2468 p = multiprocessing.Process(target=self.child, args=(5, w))
2469 p.start()
2470 new_size = r.recv()
2471 p.join()
2472 self.assertLessEqual(new_size, old_size)
2473
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002474#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002475# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2476#
2477
2478class TestIgnoreEINTR(unittest.TestCase):
2479
2480 @classmethod
2481 def _test_ignore(cls, conn):
2482 def handler(signum, frame):
2483 pass
2484 signal.signal(signal.SIGUSR1, handler)
2485 conn.send('ready')
2486 x = conn.recv()
2487 conn.send(x)
2488 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2489
2490 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2491 def test_ignore(self):
2492 conn, child_conn = multiprocessing.Pipe()
2493 try:
2494 p = multiprocessing.Process(target=self._test_ignore,
2495 args=(child_conn,))
2496 p.daemon = True
2497 p.start()
2498 child_conn.close()
2499 self.assertEqual(conn.recv(), 'ready')
2500 time.sleep(0.1)
2501 os.kill(p.pid, signal.SIGUSR1)
2502 time.sleep(0.1)
2503 conn.send(1234)
2504 self.assertEqual(conn.recv(), 1234)
2505 time.sleep(0.1)
2506 os.kill(p.pid, signal.SIGUSR1)
2507 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2508 time.sleep(0.1)
2509 p.join()
2510 finally:
2511 conn.close()
2512
2513 @classmethod
2514 def _test_ignore_listener(cls, conn):
2515 def handler(signum, frame):
2516 pass
2517 signal.signal(signal.SIGUSR1, handler)
2518 l = multiprocessing.connection.Listener()
2519 conn.send(l.address)
2520 a = l.accept()
2521 a.send('welcome')
2522
2523 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2524 def test_ignore_listener(self):
2525 conn, child_conn = multiprocessing.Pipe()
2526 try:
2527 p = multiprocessing.Process(target=self._test_ignore_listener,
2528 args=(child_conn,))
2529 p.daemon = True
2530 p.start()
2531 child_conn.close()
2532 address = conn.recv()
2533 time.sleep(0.1)
2534 os.kill(p.pid, signal.SIGUSR1)
2535 time.sleep(0.1)
2536 client = multiprocessing.connection.Client(address)
2537 self.assertEqual(client.recv(), 'welcome')
2538 p.join()
2539 finally:
2540 conn.close()
2541
2542#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002543#
2544#
2545
Jesse Noller1b90efb2009-06-30 17:11:52 +00002546testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002547 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002548 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002549
Benjamin Petersondfd79492008-06-13 19:13:39 +00002550#
2551#
2552#
2553
2554def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002555 if sys.platform.startswith("linux"):
2556 try:
2557 lock = multiprocessing.RLock()
2558 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002559 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002560
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002561 check_enough_semaphores()
2562
Benjamin Petersondfd79492008-06-13 19:13:39 +00002563 if run is None:
2564 from test.test_support import run_unittest as run
2565
2566 util.get_temp_dir() # creates temp directory for use by all processes
2567
2568 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2569
Jesse Noller146b7ab2008-07-02 16:44:09 +00002570 ProcessesMixin.pool = multiprocessing.Pool(4)
2571 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2572 ManagerMixin.manager.__init__()
2573 ManagerMixin.manager.start()
2574 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002575
2576 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002577 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2578 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002579 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2580 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002581 )
2582
2583 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2584 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002585 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2586 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002587 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002588 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002589 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002590 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2591 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2592 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002593 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002594
Jesse Noller146b7ab2008-07-02 16:44:09 +00002595 ThreadsMixin.pool.terminate()
2596 ProcessesMixin.pool.terminate()
2597 ManagerMixin.pool.terminate()
2598 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002599
Jesse Noller146b7ab2008-07-02 16:44:09 +00002600 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002601
2602def main():
2603 test_main(unittest.TextTestRunner(verbosity=2).run)
2604
2605if __name__ == '__main__':
2606 main()