blob: 3be72c4ad1ad0572590bc350b24705493f2c7ddf [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010022import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000023import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010024import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000031# import threading after _multiprocessing to raise a more revelant error
32# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.dummy
37import multiprocessing.connection
38import multiprocessing.managers
39import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
46 HAS_REDUCTION = True
47except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000065def latin(s):
66 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
69# Constants
70#
71
72LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000073#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
75DELTA = 0.1
76CHECK_TIMINGS = False # making true makes tests take a lot longer
77 # and can sometimes cause some non-serious
78 # failures because some calls block a bit
79 # longer than expected
80if CHECK_TIMINGS:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82else:
83 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84
85HAVE_GETVALUE = not getattr(_multiprocessing,
86 'HAVE_BROKEN_SEM_GETVALUE', False)
87
Jesse Noller6214edd2009-01-19 16:23:53 +000088WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020091
Richard Oudkerk59d54042012-05-10 16:11:12 +010092def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
Charles-François Natali221ef672011-11-22 18:55:22 +0100112
113def check_enough_semaphores():
114 """Check that the system supports enough semaphores to run the test."""
115 # minimum number of semaphores available according to POSIX
116 nsems_min = 256
117 try:
118 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119 except (AttributeError, ValueError):
120 # sysconf not available or setting not available
121 return
122 if nsems == -1 or nsems >= nsems_min:
123 return
124 raise unittest.SkipTest("The OS doesn't support enough semaphores "
125 "to run the test (required: %d)." % nsems_min)
126
127
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129# Creates a wrapper for a function which records the time it takes to finish
130#
131
132class TimingWrapper(object):
133
134 def __init__(self, func):
135 self.func = func
136 self.elapsed = None
137
138 def __call__(self, *args, **kwds):
139 t = time.time()
140 try:
141 return self.func(*args, **kwds)
142 finally:
143 self.elapsed = time.time() - t
144
145#
146# Base class for test cases
147#
148
149class BaseTestCase(object):
150
151 ALLOWED_TYPES = ('processes', 'manager', 'threads')
152
153 def assertTimingAlmostEqual(self, a, b):
154 if CHECK_TIMINGS:
155 self.assertAlmostEqual(a, b, 1)
156
157 def assertReturnsIfImplemented(self, value, func, *args):
158 try:
159 res = func(*args)
160 except NotImplementedError:
161 pass
162 else:
163 return self.assertEqual(value, res)
164
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000165 # For the sanity of Windows users, rather than crashing or freezing in
166 # multiple ways.
167 def __reduce__(self, *args):
168 raise NotImplementedError("shouldn't try to pickle a test case")
169
170 __reduce_ex__ = __reduce__
171
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172#
173# Return the value of a semaphore
174#
175
176def get_value(self):
177 try:
178 return self.get_value()
179 except AttributeError:
180 try:
181 return self._Semaphore__value
182 except AttributeError:
183 try:
184 return self._value
185 except AttributeError:
186 raise NotImplementedError
187
188#
189# Testcases
190#
191
192class _TestProcess(BaseTestCase):
193
194 ALLOWED_TYPES = ('processes', 'threads')
195
196 def test_current(self):
197 if self.TYPE == 'threads':
198 return
199
200 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000205 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(current.ident, os.getpid())
208 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000210 def test_daemon_argument(self):
211 if self.TYPE == "threads":
212 return
213
214 # By default uses the current process's daemon flag.
215 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000216 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 proc1 = self.Process(target=self._test, daemon=True)
218 self.assertTrue(proc1.daemon)
219 proc2 = self.Process(target=self._test, daemon=False)
220 self.assertFalse(proc2.daemon)
221
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000222 @classmethod
223 def _test(cls, q, *args, **kwds):
224 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 q.put(args)
226 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 q.put(current.pid)
231
232 def test_process(self):
233 q = self.Queue(1)
234 e = self.Event()
235 args = (q, 1, 2)
236 kwargs = {'hello':23, 'bye':2.54}
237 name = 'SomeProcess'
238 p = self.Process(
239 target=self._test, args=args, kwargs=kwargs, name=name
240 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 current = self.current_process()
243
244 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.authkey, current.authkey)
246 self.assertEqual(p.is_alive(), False)
247 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000250 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.start()
253
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(p.exitcode, None)
255 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000256 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(q.get(), args[1:])
259 self.assertEqual(q.get(), kwargs)
260 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(q.get(), current.authkey)
263 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.join()
266
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(p.exitcode, 0)
268 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 @classmethod
272 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 time.sleep(1000)
274
275 def test_terminate(self):
276 if self.TYPE == 'threads':
277 return
278
279 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.start()
282
283 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000285 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286
Richard Oudkerk59d54042012-05-10 16:11:12 +0100287 join = TimingWrapper(p.join)
288
289 self.assertEqual(join(0), None)
290 self.assertTimingAlmostEqual(join.elapsed, 0.0)
291 self.assertEqual(p.is_alive(), True)
292
293 self.assertEqual(join(-1), None)
294 self.assertTimingAlmostEqual(join.elapsed, 0.0)
295 self.assertEqual(p.is_alive(), True)
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 p.terminate()
298
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 self.assertEqual(join(), None)
300 self.assertTimingAlmostEqual(join.elapsed, 0.0)
301
302 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
305 p.join()
306
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000307 # XXX sometimes get p.exitcode == 0 on Windows ...
308 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309
310 def test_cpu_count(self):
311 try:
312 cpus = multiprocessing.cpu_count()
313 except NotImplementedError:
314 cpus = 1
315 self.assertTrue(type(cpus) is int)
316 self.assertTrue(cpus >= 1)
317
318 def test_active_children(self):
319 self.assertEqual(type(self.active_children()), list)
320
321 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000322 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323
Jesus Cea94f964f2011-09-09 20:26:57 +0200324 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000329 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000331 @classmethod
332 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 from multiprocessing import forking
334 wconn.send(id)
335 if len(id) < 2:
336 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000337 p = cls.Process(
338 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339 )
340 p.start()
341 p.join()
342
343 def test_recursion(self):
344 rconn, wconn = self.Pipe(duplex=False)
345 self._test_recursion(wconn, [])
346
347 time.sleep(DELTA)
348 result = []
349 while rconn.poll():
350 result.append(rconn.recv())
351
352 expected = [
353 [],
354 [0],
355 [0, 0],
356 [0, 1],
357 [1],
358 [1, 0],
359 [1, 1]
360 ]
361 self.assertEqual(result, expected)
362
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200363 @classmethod
364 def _test_sentinel(cls, event):
365 event.wait(10.0)
366
367 def test_sentinel(self):
368 if self.TYPE == "threads":
369 return
370 event = self.Event()
371 p = self.Process(target=self._test_sentinel, args=(event,))
372 with self.assertRaises(ValueError):
373 p.sentinel
374 p.start()
375 self.addCleanup(p.join)
376 sentinel = p.sentinel
377 self.assertIsInstance(sentinel, int)
378 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
379 event.set()
380 p.join()
381 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
382
Benjamin Petersone711caf2008-06-11 16:44:04 +0000383#
384#
385#
386
387class _UpperCaser(multiprocessing.Process):
388
389 def __init__(self):
390 multiprocessing.Process.__init__(self)
391 self.child_conn, self.parent_conn = multiprocessing.Pipe()
392
393 def run(self):
394 self.parent_conn.close()
395 for s in iter(self.child_conn.recv, None):
396 self.child_conn.send(s.upper())
397 self.child_conn.close()
398
399 def submit(self, s):
400 assert type(s) is str
401 self.parent_conn.send(s)
402 return self.parent_conn.recv()
403
404 def stop(self):
405 self.parent_conn.send(None)
406 self.parent_conn.close()
407 self.child_conn.close()
408
409class _TestSubclassingProcess(BaseTestCase):
410
411 ALLOWED_TYPES = ('processes',)
412
413 def test_subclassing(self):
414 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200415 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000416 uppercaser.start()
417 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
418 self.assertEqual(uppercaser.submit('world'), 'WORLD')
419 uppercaser.stop()
420 uppercaser.join()
421
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100422 def test_stderr_flush(self):
423 # sys.stderr is flushed at process shutdown (issue #13812)
424 if self.TYPE == "threads":
425 return
426
427 testfn = test.support.TESTFN
428 self.addCleanup(test.support.unlink, testfn)
429 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
430 proc.start()
431 proc.join()
432 with open(testfn, 'r') as f:
433 err = f.read()
434 # The whole traceback was printed
435 self.assertIn("ZeroDivisionError", err)
436 self.assertIn("test_multiprocessing.py", err)
437 self.assertIn("1/0 # MARKER", err)
438
439 @classmethod
440 def _test_stderr_flush(cls, testfn):
441 sys.stderr = open(testfn, 'w')
442 1/0 # MARKER
443
444
Richard Oudkerk29471de2012-06-06 19:04:57 +0100445 @classmethod
446 def _test_sys_exit(cls, reason, testfn):
447 sys.stderr = open(testfn, 'w')
448 sys.exit(reason)
449
450 def test_sys_exit(self):
451 # See Issue 13854
452 if self.TYPE == 'threads':
453 return
454
455 testfn = test.support.TESTFN
456 self.addCleanup(test.support.unlink, testfn)
457
458 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
459 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
460 p.daemon = True
461 p.start()
462 p.join(5)
463 self.assertEqual(p.exitcode, code)
464
465 with open(testfn, 'r') as f:
466 self.assertEqual(f.read().rstrip(), str(reason))
467
468 for reason in (True, False, 8):
469 p = self.Process(target=sys.exit, args=(reason,))
470 p.daemon = True
471 p.start()
472 p.join(5)
473 self.assertEqual(p.exitcode, reason)
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475#
476#
477#
478
479def queue_empty(q):
480 if hasattr(q, 'empty'):
481 return q.empty()
482 else:
483 return q.qsize() == 0
484
485def queue_full(q, maxsize):
486 if hasattr(q, 'full'):
487 return q.full()
488 else:
489 return q.qsize() == maxsize
490
491
492class _TestQueue(BaseTestCase):
493
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 child_can_start.wait()
498 for i in range(6):
499 queue.get()
500 parent_can_continue.set()
501
502 def test_put(self):
503 MAXSIZE = 6
504 queue = self.Queue(maxsize=MAXSIZE)
505 child_can_start = self.Event()
506 parent_can_continue = self.Event()
507
508 proc = self.Process(
509 target=self._test_put,
510 args=(queue, child_can_start, parent_can_continue)
511 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000512 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 proc.start()
514
515 self.assertEqual(queue_empty(queue), True)
516 self.assertEqual(queue_full(queue, MAXSIZE), False)
517
518 queue.put(1)
519 queue.put(2, True)
520 queue.put(3, True, None)
521 queue.put(4, False)
522 queue.put(5, False, None)
523 queue.put_nowait(6)
524
525 # the values may be in buffer but not yet in pipe so sleep a bit
526 time.sleep(DELTA)
527
528 self.assertEqual(queue_empty(queue), False)
529 self.assertEqual(queue_full(queue, MAXSIZE), True)
530
531 put = TimingWrapper(queue.put)
532 put_nowait = TimingWrapper(queue.put_nowait)
533
534 self.assertRaises(pyqueue.Full, put, 7, False)
535 self.assertTimingAlmostEqual(put.elapsed, 0)
536
537 self.assertRaises(pyqueue.Full, put, 7, False, None)
538 self.assertTimingAlmostEqual(put.elapsed, 0)
539
540 self.assertRaises(pyqueue.Full, put_nowait, 7)
541 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
542
543 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
544 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
545
546 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
547 self.assertTimingAlmostEqual(put.elapsed, 0)
548
549 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
550 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 self.assertEqual(queue_empty(queue), True)
556 self.assertEqual(queue_full(queue, MAXSIZE), False)
557
558 proc.join()
559
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000560 @classmethod
561 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000563 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000564 queue.put(2)
565 queue.put(3)
566 queue.put(4)
567 queue.put(5)
568 parent_can_continue.set()
569
570 def test_get(self):
571 queue = self.Queue()
572 child_can_start = self.Event()
573 parent_can_continue = self.Event()
574
575 proc = self.Process(
576 target=self._test_get,
577 args=(queue, child_can_start, parent_can_continue)
578 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000579 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 proc.start()
581
582 self.assertEqual(queue_empty(queue), True)
583
584 child_can_start.set()
585 parent_can_continue.wait()
586
587 time.sleep(DELTA)
588 self.assertEqual(queue_empty(queue), False)
589
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000590 # Hangs unexpectedly, remove for now
591 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 self.assertEqual(queue.get(True, None), 2)
593 self.assertEqual(queue.get(True), 3)
594 self.assertEqual(queue.get(timeout=1), 4)
595 self.assertEqual(queue.get_nowait(), 5)
596
597 self.assertEqual(queue_empty(queue), True)
598
599 get = TimingWrapper(queue.get)
600 get_nowait = TimingWrapper(queue.get_nowait)
601
602 self.assertRaises(pyqueue.Empty, get, False)
603 self.assertTimingAlmostEqual(get.elapsed, 0)
604
605 self.assertRaises(pyqueue.Empty, get, False, None)
606 self.assertTimingAlmostEqual(get.elapsed, 0)
607
608 self.assertRaises(pyqueue.Empty, get_nowait)
609 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
610
611 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
612 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
613
614 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
615 self.assertTimingAlmostEqual(get.elapsed, 0)
616
617 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
618 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
619
620 proc.join()
621
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000622 @classmethod
623 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000624 for i in range(10, 20):
625 queue.put(i)
626 # note that at this point the items may only be buffered, so the
627 # process cannot shutdown until the feeder thread has finished
628 # pushing items onto the pipe.
629
630 def test_fork(self):
631 # Old versions of Queue would fail to create a new feeder
632 # thread for a forked process if the original process had its
633 # own feeder thread. This test checks that this no longer
634 # happens.
635
636 queue = self.Queue()
637
638 # put items on queue so that main process starts a feeder thread
639 for i in range(10):
640 queue.put(i)
641
642 # wait to make sure thread starts before we fork a new process
643 time.sleep(DELTA)
644
645 # fork process
646 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200647 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000648 p.start()
649
650 # check that all expected items are in the queue
651 for i in range(20):
652 self.assertEqual(queue.get(), i)
653 self.assertRaises(pyqueue.Empty, queue.get, False)
654
655 p.join()
656
657 def test_qsize(self):
658 q = self.Queue()
659 try:
660 self.assertEqual(q.qsize(), 0)
661 except NotImplementedError:
662 return
663 q.put(1)
664 self.assertEqual(q.qsize(), 1)
665 q.put(5)
666 self.assertEqual(q.qsize(), 2)
667 q.get()
668 self.assertEqual(q.qsize(), 1)
669 q.get()
670 self.assertEqual(q.qsize(), 0)
671
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000672 @classmethod
673 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 for obj in iter(q.get, None):
675 time.sleep(DELTA)
676 q.task_done()
677
678 def test_task_done(self):
679 queue = self.JoinableQueue()
680
681 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000682 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683
684 workers = [self.Process(target=self._test_task_done, args=(queue,))
685 for i in range(4)]
686
687 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200688 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 p.start()
690
691 for i in range(10):
692 queue.put(i)
693
694 queue.join()
695
696 for p in workers:
697 queue.put(None)
698
699 for p in workers:
700 p.join()
701
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200702 def test_timeout(self):
703 q = multiprocessing.Queue()
704 start = time.time()
705 self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
706 delta = time.time() - start
707 self.assertGreaterEqual(delta, 0.19)
708
Benjamin Petersone711caf2008-06-11 16:44:04 +0000709#
710#
711#
712
713class _TestLock(BaseTestCase):
714
715 def test_lock(self):
716 lock = self.Lock()
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.acquire(False), False)
719 self.assertEqual(lock.release(), None)
720 self.assertRaises((ValueError, threading.ThreadError), lock.release)
721
722 def test_rlock(self):
723 lock = self.RLock()
724 self.assertEqual(lock.acquire(), True)
725 self.assertEqual(lock.acquire(), True)
726 self.assertEqual(lock.acquire(), True)
727 self.assertEqual(lock.release(), None)
728 self.assertEqual(lock.release(), None)
729 self.assertEqual(lock.release(), None)
730 self.assertRaises((AssertionError, RuntimeError), lock.release)
731
Jesse Nollerf8d00852009-03-31 03:25:07 +0000732 def test_lock_context(self):
733 with self.Lock():
734 pass
735
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736
737class _TestSemaphore(BaseTestCase):
738
739 def _test_semaphore(self, sem):
740 self.assertReturnsIfImplemented(2, get_value, sem)
741 self.assertEqual(sem.acquire(), True)
742 self.assertReturnsIfImplemented(1, get_value, sem)
743 self.assertEqual(sem.acquire(), True)
744 self.assertReturnsIfImplemented(0, get_value, sem)
745 self.assertEqual(sem.acquire(False), False)
746 self.assertReturnsIfImplemented(0, get_value, sem)
747 self.assertEqual(sem.release(), None)
748 self.assertReturnsIfImplemented(1, get_value, sem)
749 self.assertEqual(sem.release(), None)
750 self.assertReturnsIfImplemented(2, get_value, sem)
751
752 def test_semaphore(self):
753 sem = self.Semaphore(2)
754 self._test_semaphore(sem)
755 self.assertEqual(sem.release(), None)
756 self.assertReturnsIfImplemented(3, get_value, sem)
757 self.assertEqual(sem.release(), None)
758 self.assertReturnsIfImplemented(4, get_value, sem)
759
760 def test_bounded_semaphore(self):
761 sem = self.BoundedSemaphore(2)
762 self._test_semaphore(sem)
763 # Currently fails on OS/X
764 #if HAVE_GETVALUE:
765 # self.assertRaises(ValueError, sem.release)
766 # self.assertReturnsIfImplemented(2, get_value, sem)
767
768 def test_timeout(self):
769 if self.TYPE != 'processes':
770 return
771
772 sem = self.Semaphore(0)
773 acquire = TimingWrapper(sem.acquire)
774
775 self.assertEqual(acquire(False), False)
776 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
777
778 self.assertEqual(acquire(False, None), False)
779 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
780
781 self.assertEqual(acquire(False, TIMEOUT1), False)
782 self.assertTimingAlmostEqual(acquire.elapsed, 0)
783
784 self.assertEqual(acquire(True, TIMEOUT2), False)
785 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
786
787 self.assertEqual(acquire(timeout=TIMEOUT3), False)
788 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
789
790
791class _TestCondition(BaseTestCase):
792
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000793 @classmethod
794 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000795 cond.acquire()
796 sleeping.release()
797 cond.wait(timeout)
798 woken.release()
799 cond.release()
800
801 def check_invariant(self, cond):
802 # this is only supposed to succeed when there are no sleepers
803 if self.TYPE == 'processes':
804 try:
805 sleepers = (cond._sleeping_count.get_value() -
806 cond._woken_count.get_value())
807 self.assertEqual(sleepers, 0)
808 self.assertEqual(cond._wait_semaphore.get_value(), 0)
809 except NotImplementedError:
810 pass
811
812 def test_notify(self):
813 cond = self.Condition()
814 sleeping = self.Semaphore(0)
815 woken = self.Semaphore(0)
816
817 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000818 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000819 p.start()
820
821 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000822 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 p.start()
824
825 # wait for both children to start sleeping
826 sleeping.acquire()
827 sleeping.acquire()
828
829 # check no process/thread has woken up
830 time.sleep(DELTA)
831 self.assertReturnsIfImplemented(0, get_value, woken)
832
833 # wake up one process/thread
834 cond.acquire()
835 cond.notify()
836 cond.release()
837
838 # check one process/thread has woken up
839 time.sleep(DELTA)
840 self.assertReturnsIfImplemented(1, get_value, woken)
841
842 # wake up another
843 cond.acquire()
844 cond.notify()
845 cond.release()
846
847 # check other has woken up
848 time.sleep(DELTA)
849 self.assertReturnsIfImplemented(2, get_value, woken)
850
851 # check state is not mucked up
852 self.check_invariant(cond)
853 p.join()
854
855 def test_notify_all(self):
856 cond = self.Condition()
857 sleeping = self.Semaphore(0)
858 woken = self.Semaphore(0)
859
860 # start some threads/processes which will timeout
861 for i in range(3):
862 p = self.Process(target=self.f,
863 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000864 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865 p.start()
866
867 t = threading.Thread(target=self.f,
868 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000869 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870 t.start()
871
872 # wait for them all to sleep
873 for i in range(6):
874 sleeping.acquire()
875
876 # check they have all timed out
877 for i in range(6):
878 woken.acquire()
879 self.assertReturnsIfImplemented(0, get_value, woken)
880
881 # check state is not mucked up
882 self.check_invariant(cond)
883
884 # start some more threads/processes
885 for i in range(3):
886 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000887 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 p.start()
889
890 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000891 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 t.start()
893
894 # wait for them to all sleep
895 for i in range(6):
896 sleeping.acquire()
897
898 # check no process/thread has woken up
899 time.sleep(DELTA)
900 self.assertReturnsIfImplemented(0, get_value, woken)
901
902 # wake them all up
903 cond.acquire()
904 cond.notify_all()
905 cond.release()
906
907 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200908 for i in range(10):
909 try:
910 if get_value(woken) == 6:
911 break
912 except NotImplementedError:
913 break
914 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000915 self.assertReturnsIfImplemented(6, get_value, woken)
916
917 # check state is not mucked up
918 self.check_invariant(cond)
919
920 def test_timeout(self):
921 cond = self.Condition()
922 wait = TimingWrapper(cond.wait)
923 cond.acquire()
924 res = wait(TIMEOUT1)
925 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000926 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
928
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200929 @classmethod
930 def _test_waitfor_f(cls, cond, state):
931 with cond:
932 state.value = 0
933 cond.notify()
934 result = cond.wait_for(lambda : state.value==4)
935 if not result or state.value != 4:
936 sys.exit(1)
937
938 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
939 def test_waitfor(self):
940 # based on test in test/lock_tests.py
941 cond = self.Condition()
942 state = self.Value('i', -1)
943
944 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
945 p.daemon = True
946 p.start()
947
948 with cond:
949 result = cond.wait_for(lambda : state.value==0)
950 self.assertTrue(result)
951 self.assertEqual(state.value, 0)
952
953 for i in range(4):
954 time.sleep(0.01)
955 with cond:
956 state.value += 1
957 cond.notify()
958
959 p.join(5)
960 self.assertFalse(p.is_alive())
961 self.assertEqual(p.exitcode, 0)
962
963 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100964 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
965 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200966 with cond:
967 expected = 0.1
968 dt = time.time()
969 result = cond.wait_for(lambda : state.value==4, timeout=expected)
970 dt = time.time() - dt
971 # borrow logic in assertTimeout() from test/lock_tests.py
972 if not result and expected * 0.6 < dt < expected * 10.0:
973 success.value = True
974
975 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
976 def test_waitfor_timeout(self):
977 # based on test in test/lock_tests.py
978 cond = self.Condition()
979 state = self.Value('i', 0)
980 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100981 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200982
983 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100984 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200985 p.daemon = True
986 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100987 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200988
989 # Only increment 3 times, so state == 4 is never reached.
990 for i in range(3):
991 time.sleep(0.01)
992 with cond:
993 state.value += 1
994 cond.notify()
995
996 p.join(5)
997 self.assertTrue(success.value)
998
Richard Oudkerk98449932012-06-05 13:15:29 +0100999 @classmethod
1000 def _test_wait_result(cls, c, pid):
1001 with c:
1002 c.notify()
1003 time.sleep(1)
1004 if pid is not None:
1005 os.kill(pid, signal.SIGINT)
1006
1007 def test_wait_result(self):
1008 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1009 pid = os.getpid()
1010 else:
1011 pid = None
1012
1013 c = self.Condition()
1014 with c:
1015 self.assertFalse(c.wait(0))
1016 self.assertFalse(c.wait(0.1))
1017
1018 p = self.Process(target=self._test_wait_result, args=(c, pid))
1019 p.start()
1020
1021 self.assertTrue(c.wait(10))
1022 if pid is not None:
1023 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1024
1025 p.join()
1026
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027
1028class _TestEvent(BaseTestCase):
1029
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001030 @classmethod
1031 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032 time.sleep(TIMEOUT2)
1033 event.set()
1034
1035 def test_event(self):
1036 event = self.Event()
1037 wait = TimingWrapper(event.wait)
1038
Ezio Melotti13925002011-03-16 11:05:33 +02001039 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001040 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001041 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001042
Benjamin Peterson965ce872009-04-05 21:24:58 +00001043 # Removed, threading.Event.wait() will return the value of the __flag
1044 # instead of None. API Shear with the semaphore backed mp.Event
1045 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001046 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001047 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1049
1050 event.set()
1051
1052 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001053 self.assertEqual(event.is_set(), True)
1054 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001055 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001056 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001057 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1058 # self.assertEqual(event.is_set(), True)
1059
1060 event.clear()
1061
1062 #self.assertEqual(event.is_set(), False)
1063
Jesus Cea94f964f2011-09-09 20:26:57 +02001064 p = self.Process(target=self._test_event, args=(event,))
1065 p.daemon = True
1066 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001067 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068
1069#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001070# Tests for Barrier - adapted from tests in test/lock_tests.py
1071#
1072
1073# Many of the tests for threading.Barrier use a list as an atomic
1074# counter: a value is appended to increment the counter, and the
1075# length of the list gives the value. We use the class DummyList
1076# for the same purpose.
1077
1078class _DummyList(object):
1079
1080 def __init__(self):
1081 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1082 lock = multiprocessing.Lock()
1083 self.__setstate__((wrapper, lock))
1084 self._lengthbuf[0] = 0
1085
1086 def __setstate__(self, state):
1087 (self._wrapper, self._lock) = state
1088 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1089
1090 def __getstate__(self):
1091 return (self._wrapper, self._lock)
1092
1093 def append(self, _):
1094 with self._lock:
1095 self._lengthbuf[0] += 1
1096
1097 def __len__(self):
1098 with self._lock:
1099 return self._lengthbuf[0]
1100
1101def _wait():
1102 # A crude wait/yield function not relying on synchronization primitives.
1103 time.sleep(0.01)
1104
1105
1106class Bunch(object):
1107 """
1108 A bunch of threads.
1109 """
1110 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1111 """
1112 Construct a bunch of `n` threads running the same function `f`.
1113 If `wait_before_exit` is True, the threads won't terminate until
1114 do_finish() is called.
1115 """
1116 self.f = f
1117 self.args = args
1118 self.n = n
1119 self.started = namespace.DummyList()
1120 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001121 self._can_exit = namespace.Event()
1122 if not wait_before_exit:
1123 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001124 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001125 p = namespace.Process(target=self.task)
1126 p.daemon = True
1127 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001128
1129 def task(self):
1130 pid = os.getpid()
1131 self.started.append(pid)
1132 try:
1133 self.f(*self.args)
1134 finally:
1135 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001136 self._can_exit.wait(30)
1137 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001138
1139 def wait_for_started(self):
1140 while len(self.started) < self.n:
1141 _wait()
1142
1143 def wait_for_finished(self):
1144 while len(self.finished) < self.n:
1145 _wait()
1146
1147 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001148 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001149
1150
1151class AppendTrue(object):
1152 def __init__(self, obj):
1153 self.obj = obj
1154 def __call__(self):
1155 self.obj.append(True)
1156
1157
1158class _TestBarrier(BaseTestCase):
1159 """
1160 Tests for Barrier objects.
1161 """
1162 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001163 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001164
1165 def setUp(self):
1166 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1167
1168 def tearDown(self):
1169 self.barrier.abort()
1170 self.barrier = None
1171
1172 def DummyList(self):
1173 if self.TYPE == 'threads':
1174 return []
1175 elif self.TYPE == 'manager':
1176 return self.manager.list()
1177 else:
1178 return _DummyList()
1179
1180 def run_threads(self, f, args):
1181 b = Bunch(self, f, args, self.N-1)
1182 f(*args)
1183 b.wait_for_finished()
1184
1185 @classmethod
1186 def multipass(cls, barrier, results, n):
1187 m = barrier.parties
1188 assert m == cls.N
1189 for i in range(n):
1190 results[0].append(True)
1191 assert len(results[1]) == i * m
1192 barrier.wait()
1193 results[1].append(True)
1194 assert len(results[0]) == (i + 1) * m
1195 barrier.wait()
1196 try:
1197 assert barrier.n_waiting == 0
1198 except NotImplementedError:
1199 pass
1200 assert not barrier.broken
1201
1202 def test_barrier(self, passes=1):
1203 """
1204 Test that a barrier is passed in lockstep
1205 """
1206 results = [self.DummyList(), self.DummyList()]
1207 self.run_threads(self.multipass, (self.barrier, results, passes))
1208
1209 def test_barrier_10(self):
1210 """
1211 Test that a barrier works for 10 consecutive runs
1212 """
1213 return self.test_barrier(10)
1214
1215 @classmethod
1216 def _test_wait_return_f(cls, barrier, queue):
1217 res = barrier.wait()
1218 queue.put(res)
1219
1220 def test_wait_return(self):
1221 """
1222 test the return value from barrier.wait
1223 """
1224 queue = self.Queue()
1225 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1226 results = [queue.get() for i in range(self.N)]
1227 self.assertEqual(results.count(0), 1)
1228
1229 @classmethod
1230 def _test_action_f(cls, barrier, results):
1231 barrier.wait()
1232 if len(results) != 1:
1233 raise RuntimeError
1234
1235 def test_action(self):
1236 """
1237 Test the 'action' callback
1238 """
1239 results = self.DummyList()
1240 barrier = self.Barrier(self.N, action=AppendTrue(results))
1241 self.run_threads(self._test_action_f, (barrier, results))
1242 self.assertEqual(len(results), 1)
1243
1244 @classmethod
1245 def _test_abort_f(cls, barrier, results1, results2):
1246 try:
1247 i = barrier.wait()
1248 if i == cls.N//2:
1249 raise RuntimeError
1250 barrier.wait()
1251 results1.append(True)
1252 except threading.BrokenBarrierError:
1253 results2.append(True)
1254 except RuntimeError:
1255 barrier.abort()
1256
1257 def test_abort(self):
1258 """
1259 Test that an abort will put the barrier in a broken state
1260 """
1261 results1 = self.DummyList()
1262 results2 = self.DummyList()
1263 self.run_threads(self._test_abort_f,
1264 (self.barrier, results1, results2))
1265 self.assertEqual(len(results1), 0)
1266 self.assertEqual(len(results2), self.N-1)
1267 self.assertTrue(self.barrier.broken)
1268
1269 @classmethod
1270 def _test_reset_f(cls, barrier, results1, results2, results3):
1271 i = barrier.wait()
1272 if i == cls.N//2:
1273 # Wait until the other threads are all in the barrier.
1274 while barrier.n_waiting < cls.N-1:
1275 time.sleep(0.001)
1276 barrier.reset()
1277 else:
1278 try:
1279 barrier.wait()
1280 results1.append(True)
1281 except threading.BrokenBarrierError:
1282 results2.append(True)
1283 # Now, pass the barrier again
1284 barrier.wait()
1285 results3.append(True)
1286
1287 def test_reset(self):
1288 """
1289 Test that a 'reset' on a barrier frees the waiting threads
1290 """
1291 results1 = self.DummyList()
1292 results2 = self.DummyList()
1293 results3 = self.DummyList()
1294 self.run_threads(self._test_reset_f,
1295 (self.barrier, results1, results2, results3))
1296 self.assertEqual(len(results1), 0)
1297 self.assertEqual(len(results2), self.N-1)
1298 self.assertEqual(len(results3), self.N)
1299
1300 @classmethod
1301 def _test_abort_and_reset_f(cls, barrier, barrier2,
1302 results1, results2, results3):
1303 try:
1304 i = barrier.wait()
1305 if i == cls.N//2:
1306 raise RuntimeError
1307 barrier.wait()
1308 results1.append(True)
1309 except threading.BrokenBarrierError:
1310 results2.append(True)
1311 except RuntimeError:
1312 barrier.abort()
1313 # Synchronize and reset the barrier. Must synchronize first so
1314 # that everyone has left it when we reset, and after so that no
1315 # one enters it before the reset.
1316 if barrier2.wait() == cls.N//2:
1317 barrier.reset()
1318 barrier2.wait()
1319 barrier.wait()
1320 results3.append(True)
1321
1322 def test_abort_and_reset(self):
1323 """
1324 Test that a barrier can be reset after being broken.
1325 """
1326 results1 = self.DummyList()
1327 results2 = self.DummyList()
1328 results3 = self.DummyList()
1329 barrier2 = self.Barrier(self.N)
1330
1331 self.run_threads(self._test_abort_and_reset_f,
1332 (self.barrier, barrier2, results1, results2, results3))
1333 self.assertEqual(len(results1), 0)
1334 self.assertEqual(len(results2), self.N-1)
1335 self.assertEqual(len(results3), self.N)
1336
1337 @classmethod
1338 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001339 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001340 if i == cls.N//2:
1341 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001342 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001343 try:
1344 barrier.wait(0.5)
1345 except threading.BrokenBarrierError:
1346 results.append(True)
1347
1348 def test_timeout(self):
1349 """
1350 Test wait(timeout)
1351 """
1352 results = self.DummyList()
1353 self.run_threads(self._test_timeout_f, (self.barrier, results))
1354 self.assertEqual(len(results), self.barrier.parties)
1355
1356 @classmethod
1357 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001358 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001359 if i == cls.N//2:
1360 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001361 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001362 try:
1363 barrier.wait()
1364 except threading.BrokenBarrierError:
1365 results.append(True)
1366
1367 def test_default_timeout(self):
1368 """
1369 Test the barrier's default timeout
1370 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001371 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001372 results = self.DummyList()
1373 self.run_threads(self._test_default_timeout_f, (barrier, results))
1374 self.assertEqual(len(results), barrier.parties)
1375
1376 def test_single_thread(self):
1377 b = self.Barrier(1)
1378 b.wait()
1379 b.wait()
1380
1381 @classmethod
1382 def _test_thousand_f(cls, barrier, passes, conn, lock):
1383 for i in range(passes):
1384 barrier.wait()
1385 with lock:
1386 conn.send(i)
1387
1388 def test_thousand(self):
1389 if self.TYPE == 'manager':
1390 return
1391 passes = 1000
1392 lock = self.Lock()
1393 conn, child_conn = self.Pipe(False)
1394 for j in range(self.N):
1395 p = self.Process(target=self._test_thousand_f,
1396 args=(self.barrier, passes, child_conn, lock))
1397 p.start()
1398
1399 for i in range(passes):
1400 for j in range(self.N):
1401 self.assertEqual(conn.recv(), i)
1402
1403#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001404#
1405#
1406
1407class _TestValue(BaseTestCase):
1408
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001409 ALLOWED_TYPES = ('processes',)
1410
Benjamin Petersone711caf2008-06-11 16:44:04 +00001411 codes_values = [
1412 ('i', 4343, 24234),
1413 ('d', 3.625, -4.25),
1414 ('h', -232, 234),
1415 ('c', latin('x'), latin('y'))
1416 ]
1417
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001418 def setUp(self):
1419 if not HAS_SHAREDCTYPES:
1420 self.skipTest("requires multiprocessing.sharedctypes")
1421
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001422 @classmethod
1423 def _test(cls, values):
1424 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001425 sv.value = cv[2]
1426
1427
1428 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001429 if raw:
1430 values = [self.RawValue(code, value)
1431 for code, value, _ in self.codes_values]
1432 else:
1433 values = [self.Value(code, value)
1434 for code, value, _ in self.codes_values]
1435
1436 for sv, cv in zip(values, self.codes_values):
1437 self.assertEqual(sv.value, cv[1])
1438
1439 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001441 proc.start()
1442 proc.join()
1443
1444 for sv, cv in zip(values, self.codes_values):
1445 self.assertEqual(sv.value, cv[2])
1446
1447 def test_rawvalue(self):
1448 self.test_value(raw=True)
1449
1450 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001451 val1 = self.Value('i', 5)
1452 lock1 = val1.get_lock()
1453 obj1 = val1.get_obj()
1454
1455 val2 = self.Value('i', 5, lock=None)
1456 lock2 = val2.get_lock()
1457 obj2 = val2.get_obj()
1458
1459 lock = self.Lock()
1460 val3 = self.Value('i', 5, lock=lock)
1461 lock3 = val3.get_lock()
1462 obj3 = val3.get_obj()
1463 self.assertEqual(lock, lock3)
1464
Jesse Nollerb0516a62009-01-18 03:11:38 +00001465 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001466 self.assertFalse(hasattr(arr4, 'get_lock'))
1467 self.assertFalse(hasattr(arr4, 'get_obj'))
1468
Jesse Nollerb0516a62009-01-18 03:11:38 +00001469 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1470
1471 arr5 = self.RawValue('i', 5)
1472 self.assertFalse(hasattr(arr5, 'get_lock'))
1473 self.assertFalse(hasattr(arr5, 'get_obj'))
1474
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475
1476class _TestArray(BaseTestCase):
1477
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001478 ALLOWED_TYPES = ('processes',)
1479
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001480 @classmethod
1481 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001482 for i in range(1, len(seq)):
1483 seq[i] += seq[i-1]
1484
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001485 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001486 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001487 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1488 if raw:
1489 arr = self.RawArray('i', seq)
1490 else:
1491 arr = self.Array('i', seq)
1492
1493 self.assertEqual(len(arr), len(seq))
1494 self.assertEqual(arr[3], seq[3])
1495 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1496
1497 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1498
1499 self.assertEqual(list(arr[:]), seq)
1500
1501 self.f(seq)
1502
1503 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001504 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001505 p.start()
1506 p.join()
1507
1508 self.assertEqual(list(arr[:]), seq)
1509
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001510 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001511 def test_array_from_size(self):
1512 size = 10
1513 # Test for zeroing (see issue #11675).
1514 # The repetition below strengthens the test by increasing the chances
1515 # of previously allocated non-zero memory being used for the new array
1516 # on the 2nd and 3rd loops.
1517 for _ in range(3):
1518 arr = self.Array('i', size)
1519 self.assertEqual(len(arr), size)
1520 self.assertEqual(list(arr), [0] * size)
1521 arr[:] = range(10)
1522 self.assertEqual(list(arr), list(range(10)))
1523 del arr
1524
1525 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001526 def test_rawarray(self):
1527 self.test_array(raw=True)
1528
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001529 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001530 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001531 arr1 = self.Array('i', list(range(10)))
1532 lock1 = arr1.get_lock()
1533 obj1 = arr1.get_obj()
1534
1535 arr2 = self.Array('i', list(range(10)), lock=None)
1536 lock2 = arr2.get_lock()
1537 obj2 = arr2.get_obj()
1538
1539 lock = self.Lock()
1540 arr3 = self.Array('i', list(range(10)), lock=lock)
1541 lock3 = arr3.get_lock()
1542 obj3 = arr3.get_obj()
1543 self.assertEqual(lock, lock3)
1544
Jesse Nollerb0516a62009-01-18 03:11:38 +00001545 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001546 self.assertFalse(hasattr(arr4, 'get_lock'))
1547 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001548 self.assertRaises(AttributeError,
1549 self.Array, 'i', range(10), lock='notalock')
1550
1551 arr5 = self.RawArray('i', range(10))
1552 self.assertFalse(hasattr(arr5, 'get_lock'))
1553 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001554
1555#
1556#
1557#
1558
1559class _TestContainers(BaseTestCase):
1560
1561 ALLOWED_TYPES = ('manager',)
1562
1563 def test_list(self):
1564 a = self.list(list(range(10)))
1565 self.assertEqual(a[:], list(range(10)))
1566
1567 b = self.list()
1568 self.assertEqual(b[:], [])
1569
1570 b.extend(list(range(5)))
1571 self.assertEqual(b[:], list(range(5)))
1572
1573 self.assertEqual(b[2], 2)
1574 self.assertEqual(b[2:10], [2,3,4])
1575
1576 b *= 2
1577 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1578
1579 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1580
1581 self.assertEqual(a[:], list(range(10)))
1582
1583 d = [a, b]
1584 e = self.list(d)
1585 self.assertEqual(
1586 e[:],
1587 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1588 )
1589
1590 f = self.list([a])
1591 a.append('hello')
1592 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1593
1594 def test_dict(self):
1595 d = self.dict()
1596 indices = list(range(65, 70))
1597 for i in indices:
1598 d[i] = chr(i)
1599 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1600 self.assertEqual(sorted(d.keys()), indices)
1601 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1602 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1603
1604 def test_namespace(self):
1605 n = self.Namespace()
1606 n.name = 'Bob'
1607 n.job = 'Builder'
1608 n._hidden = 'hidden'
1609 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1610 del n.job
1611 self.assertEqual(str(n), "Namespace(name='Bob')")
1612 self.assertTrue(hasattr(n, 'name'))
1613 self.assertTrue(not hasattr(n, 'job'))
1614
1615#
1616#
1617#
1618
1619def sqr(x, wait=0.0):
1620 time.sleep(wait)
1621 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001622
Antoine Pitroude911b22011-12-21 11:03:24 +01001623def mul(x, y):
1624 return x*y
1625
Benjamin Petersone711caf2008-06-11 16:44:04 +00001626class _TestPool(BaseTestCase):
1627
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001628 @classmethod
1629 def setUpClass(cls):
1630 super().setUpClass()
1631 cls.pool = cls.Pool(4)
1632
1633 @classmethod
1634 def tearDownClass(cls):
1635 cls.pool.terminate()
1636 cls.pool.join()
1637 cls.pool = None
1638 super().tearDownClass()
1639
Benjamin Petersone711caf2008-06-11 16:44:04 +00001640 def test_apply(self):
1641 papply = self.pool.apply
1642 self.assertEqual(papply(sqr, (5,)), sqr(5))
1643 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1644
1645 def test_map(self):
1646 pmap = self.pool.map
1647 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1648 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1649 list(map(sqr, list(range(100)))))
1650
Antoine Pitroude911b22011-12-21 11:03:24 +01001651 def test_starmap(self):
1652 psmap = self.pool.starmap
1653 tuples = list(zip(range(10), range(9,-1, -1)))
1654 self.assertEqual(psmap(mul, tuples),
1655 list(itertools.starmap(mul, tuples)))
1656 tuples = list(zip(range(100), range(99,-1, -1)))
1657 self.assertEqual(psmap(mul, tuples, chunksize=20),
1658 list(itertools.starmap(mul, tuples)))
1659
1660 def test_starmap_async(self):
1661 tuples = list(zip(range(100), range(99,-1, -1)))
1662 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1663 list(itertools.starmap(mul, tuples)))
1664
Hynek Schlawack254af262012-10-27 12:53:02 +02001665 def test_map_async(self):
1666 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1667 list(map(sqr, list(range(10)))))
1668
1669 def test_map_async_callbacks(self):
1670 call_args = self.manager.list() if self.TYPE == 'manager' else []
1671 self.pool.map_async(int, ['1'],
1672 callback=call_args.append,
1673 error_callback=call_args.append).wait()
1674 self.assertEqual(1, len(call_args))
1675 self.assertEqual([1], call_args[0])
1676 self.pool.map_async(int, ['a'],
1677 callback=call_args.append,
1678 error_callback=call_args.append).wait()
1679 self.assertEqual(2, len(call_args))
1680 self.assertIsInstance(call_args[1], ValueError)
1681
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001682 def test_map_chunksize(self):
1683 try:
1684 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1685 except multiprocessing.TimeoutError:
1686 self.fail("pool.map_async with chunksize stalled on null list")
1687
Benjamin Petersone711caf2008-06-11 16:44:04 +00001688 def test_async(self):
1689 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1690 get = TimingWrapper(res.get)
1691 self.assertEqual(get(), 49)
1692 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1693
1694 def test_async_timeout(self):
1695 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1696 get = TimingWrapper(res.get)
1697 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1698 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1699
1700 def test_imap(self):
1701 it = self.pool.imap(sqr, list(range(10)))
1702 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1703
1704 it = self.pool.imap(sqr, list(range(10)))
1705 for i in range(10):
1706 self.assertEqual(next(it), i*i)
1707 self.assertRaises(StopIteration, it.__next__)
1708
1709 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1710 for i in range(1000):
1711 self.assertEqual(next(it), i*i)
1712 self.assertRaises(StopIteration, it.__next__)
1713
1714 def test_imap_unordered(self):
1715 it = self.pool.imap_unordered(sqr, list(range(1000)))
1716 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1717
1718 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1719 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1720
1721 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001722 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1723 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1724
Benjamin Petersone711caf2008-06-11 16:44:04 +00001725 p = multiprocessing.Pool(3)
1726 self.assertEqual(3, len(p._pool))
1727 p.close()
1728 p.join()
1729
1730 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001731 result = self.pool.map_async(
1732 time.sleep, [0.1 for i in range(10000)], chunksize=1
1733 )
1734 self.pool.terminate()
1735 join = TimingWrapper(self.pool.join)
1736 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001737 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001738
Richard Oudkerke41682b2012-06-06 19:04:57 +01001739 def test_empty_iterable(self):
1740 # See Issue 12157
1741 p = self.Pool(1)
1742
1743 self.assertEqual(p.map(sqr, []), [])
1744 self.assertEqual(list(p.imap(sqr, [])), [])
1745 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1746 self.assertEqual(p.map_async(sqr, []).get(), [])
1747
1748 p.close()
1749 p.join()
1750
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001751 def test_context(self):
1752 if self.TYPE == 'processes':
1753 L = list(range(10))
1754 expected = [sqr(i) for i in L]
1755 with multiprocessing.Pool(2) as p:
1756 r = p.map_async(sqr, L)
1757 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001758 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001759
Richard Oudkerk85757832013-05-06 11:38:25 +01001760 @classmethod
1761 def _test_traceback(cls):
1762 raise RuntimeError(123) # some comment
1763
1764 def test_traceback(self):
1765 # We want ensure that the traceback from the child process is
1766 # contained in the traceback raised in the main process.
1767 if self.TYPE == 'processes':
1768 with self.Pool(1) as p:
1769 try:
1770 p.apply(self._test_traceback)
1771 except Exception as e:
1772 exc = e
1773 else:
1774 raise AssertionError('expected RuntimeError')
1775 self.assertIs(type(exc), RuntimeError)
1776 self.assertEqual(exc.args, (123,))
1777 cause = exc.__cause__
1778 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
1779 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1780
1781 with test.support.captured_stderr() as f1:
1782 try:
1783 raise exc
1784 except RuntimeError:
1785 sys.excepthook(*sys.exc_info())
1786 self.assertIn('raise RuntimeError(123) # some comment',
1787 f1.getvalue())
1788
Ask Solem2afcbf22010-11-09 20:55:52 +00001789def raising():
1790 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001791
Ask Solem2afcbf22010-11-09 20:55:52 +00001792def unpickleable_result():
1793 return lambda: 42
1794
1795class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001796 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001797
1798 def test_async_error_callback(self):
1799 p = multiprocessing.Pool(2)
1800
1801 scratchpad = [None]
1802 def errback(exc):
1803 scratchpad[0] = exc
1804
1805 res = p.apply_async(raising, error_callback=errback)
1806 self.assertRaises(KeyError, res.get)
1807 self.assertTrue(scratchpad[0])
1808 self.assertIsInstance(scratchpad[0], KeyError)
1809
1810 p.close()
1811 p.join()
1812
1813 def test_unpickleable_result(self):
1814 from multiprocessing.pool import MaybeEncodingError
1815 p = multiprocessing.Pool(2)
1816
1817 # Make sure we don't lose pool processes because of encoding errors.
1818 for iteration in range(20):
1819
1820 scratchpad = [None]
1821 def errback(exc):
1822 scratchpad[0] = exc
1823
1824 res = p.apply_async(unpickleable_result, error_callback=errback)
1825 self.assertRaises(MaybeEncodingError, res.get)
1826 wrapped = scratchpad[0]
1827 self.assertTrue(wrapped)
1828 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1829 self.assertIsNotNone(wrapped.exc)
1830 self.assertIsNotNone(wrapped.value)
1831
1832 p.close()
1833 p.join()
1834
1835class _TestPoolWorkerLifetime(BaseTestCase):
1836 ALLOWED_TYPES = ('processes', )
1837
Jesse Noller1f0b6582010-01-27 03:36:01 +00001838 def test_pool_worker_lifetime(self):
1839 p = multiprocessing.Pool(3, maxtasksperchild=10)
1840 self.assertEqual(3, len(p._pool))
1841 origworkerpids = [w.pid for w in p._pool]
1842 # Run many tasks so each worker gets replaced (hopefully)
1843 results = []
1844 for i in range(100):
1845 results.append(p.apply_async(sqr, (i, )))
1846 # Fetch the results and verify we got the right answers,
1847 # also ensuring all the tasks have completed.
1848 for (j, res) in enumerate(results):
1849 self.assertEqual(res.get(), sqr(j))
1850 # Refill the pool
1851 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001852 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001853 # (countdown * DELTA = 5 seconds max startup process time)
1854 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001855 while countdown and not all(w.is_alive() for w in p._pool):
1856 countdown -= 1
1857 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001858 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001859 # All pids should be assigned. See issue #7805.
1860 self.assertNotIn(None, origworkerpids)
1861 self.assertNotIn(None, finalworkerpids)
1862 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001863 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1864 p.close()
1865 p.join()
1866
Charles-François Natalif8859e12011-10-24 18:45:29 +02001867 def test_pool_worker_lifetime_early_close(self):
1868 # Issue #10332: closing a pool whose workers have limited lifetimes
1869 # before all the tasks completed would make join() hang.
1870 p = multiprocessing.Pool(3, maxtasksperchild=1)
1871 results = []
1872 for i in range(6):
1873 results.append(p.apply_async(sqr, (i, 0.3)))
1874 p.close()
1875 p.join()
1876 # check the results
1877 for (j, res) in enumerate(results):
1878 self.assertEqual(res.get(), sqr(j))
1879
Benjamin Petersone711caf2008-06-11 16:44:04 +00001880#
1881# Test of creating a customized manager class
1882#
1883
1884from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1885
1886class FooBar(object):
1887 def f(self):
1888 return 'f()'
1889 def g(self):
1890 raise ValueError
1891 def _h(self):
1892 return '_h()'
1893
1894def baz():
1895 for i in range(10):
1896 yield i*i
1897
1898class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001899 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001900 def __iter__(self):
1901 return self
1902 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001903 return self._callmethod('__next__')
1904
1905class MyManager(BaseManager):
1906 pass
1907
1908MyManager.register('Foo', callable=FooBar)
1909MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1910MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1911
1912
1913class _TestMyManager(BaseTestCase):
1914
1915 ALLOWED_TYPES = ('manager',)
1916
1917 def test_mymanager(self):
1918 manager = MyManager()
1919 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001920 self.common(manager)
1921 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001922
Richard Oudkerkac385712012-06-18 21:29:30 +01001923 # If the manager process exited cleanly then the exitcode
1924 # will be zero. Otherwise (after a short timeout)
1925 # terminate() is used, resulting in an exitcode of -SIGTERM.
1926 self.assertEqual(manager._process.exitcode, 0)
1927
1928 def test_mymanager_context(self):
1929 with MyManager() as manager:
1930 self.common(manager)
1931 self.assertEqual(manager._process.exitcode, 0)
1932
1933 def test_mymanager_context_prestarted(self):
1934 manager = MyManager()
1935 manager.start()
1936 with manager:
1937 self.common(manager)
1938 self.assertEqual(manager._process.exitcode, 0)
1939
1940 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001941 foo = manager.Foo()
1942 bar = manager.Bar()
1943 baz = manager.baz()
1944
1945 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1946 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1947
1948 self.assertEqual(foo_methods, ['f', 'g'])
1949 self.assertEqual(bar_methods, ['f', '_h'])
1950
1951 self.assertEqual(foo.f(), 'f()')
1952 self.assertRaises(ValueError, foo.g)
1953 self.assertEqual(foo._callmethod('f'), 'f()')
1954 self.assertRaises(RemoteError, foo._callmethod, '_h')
1955
1956 self.assertEqual(bar.f(), 'f()')
1957 self.assertEqual(bar._h(), '_h()')
1958 self.assertEqual(bar._callmethod('f'), 'f()')
1959 self.assertEqual(bar._callmethod('_h'), '_h()')
1960
1961 self.assertEqual(list(baz), [i*i for i in range(10)])
1962
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001963
Benjamin Petersone711caf2008-06-11 16:44:04 +00001964#
1965# Test of connecting to a remote server and using xmlrpclib for serialization
1966#
1967
1968_queue = pyqueue.Queue()
1969def get_queue():
1970 return _queue
1971
1972class QueueManager(BaseManager):
1973 '''manager class used by server process'''
1974QueueManager.register('get_queue', callable=get_queue)
1975
1976class QueueManager2(BaseManager):
1977 '''manager class which specifies the same interface as QueueManager'''
1978QueueManager2.register('get_queue')
1979
1980
1981SERIALIZER = 'xmlrpclib'
1982
1983class _TestRemoteManager(BaseTestCase):
1984
1985 ALLOWED_TYPES = ('manager',)
1986
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001987 @classmethod
1988 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001989 manager = QueueManager2(
1990 address=address, authkey=authkey, serializer=SERIALIZER
1991 )
1992 manager.connect()
1993 queue = manager.get_queue()
1994 queue.put(('hello world', None, True, 2.25))
1995
1996 def test_remote(self):
1997 authkey = os.urandom(32)
1998
1999 manager = QueueManager(
2000 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
2001 )
2002 manager.start()
2003
2004 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002005 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002006 p.start()
2007
2008 manager2 = QueueManager2(
2009 address=manager.address, authkey=authkey, serializer=SERIALIZER
2010 )
2011 manager2.connect()
2012 queue = manager2.get_queue()
2013
2014 # Note that xmlrpclib will deserialize object as a list not a tuple
2015 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2016
2017 # Because we are using xmlrpclib for serialization instead of
2018 # pickle this will cause a serialization error.
2019 self.assertRaises(Exception, queue.put, time.sleep)
2020
2021 # Make queue finalizer run before the server is stopped
2022 del queue
2023 manager.shutdown()
2024
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002025class _TestManagerRestart(BaseTestCase):
2026
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002027 @classmethod
2028 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002029 manager = QueueManager(
2030 address=address, authkey=authkey, serializer=SERIALIZER)
2031 manager.connect()
2032 queue = manager.get_queue()
2033 queue.put('hello world')
2034
2035 def test_rapid_restart(self):
2036 authkey = os.urandom(32)
2037 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002038 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002039 srvr = manager.get_server()
2040 addr = srvr.address
2041 # Close the connection.Listener socket which gets opened as a part
2042 # of manager.get_server(). It's not needed for the test.
2043 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002044 manager.start()
2045
2046 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002047 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002048 p.start()
2049 queue = manager.get_queue()
2050 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002051 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002052 manager.shutdown()
2053 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002054 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002055 try:
2056 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002057 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002058 if e.errno != errno.EADDRINUSE:
2059 raise
2060 # Retry after some time, in case the old socket was lingering
2061 # (sporadic failure on buildbots)
2062 time.sleep(1.0)
2063 manager = QueueManager(
2064 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002065 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002066
Benjamin Petersone711caf2008-06-11 16:44:04 +00002067#
2068#
2069#
2070
2071SENTINEL = latin('')
2072
2073class _TestConnection(BaseTestCase):
2074
2075 ALLOWED_TYPES = ('processes', 'threads')
2076
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002077 @classmethod
2078 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002079 for msg in iter(conn.recv_bytes, SENTINEL):
2080 conn.send_bytes(msg)
2081 conn.close()
2082
2083 def test_connection(self):
2084 conn, child_conn = self.Pipe()
2085
2086 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002087 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002088 p.start()
2089
2090 seq = [1, 2.25, None]
2091 msg = latin('hello world')
2092 longmsg = msg * 10
2093 arr = array.array('i', list(range(4)))
2094
2095 if self.TYPE == 'processes':
2096 self.assertEqual(type(conn.fileno()), int)
2097
2098 self.assertEqual(conn.send(seq), None)
2099 self.assertEqual(conn.recv(), seq)
2100
2101 self.assertEqual(conn.send_bytes(msg), None)
2102 self.assertEqual(conn.recv_bytes(), msg)
2103
2104 if self.TYPE == 'processes':
2105 buffer = array.array('i', [0]*10)
2106 expected = list(arr) + [0] * (10 - len(arr))
2107 self.assertEqual(conn.send_bytes(arr), None)
2108 self.assertEqual(conn.recv_bytes_into(buffer),
2109 len(arr) * buffer.itemsize)
2110 self.assertEqual(list(buffer), expected)
2111
2112 buffer = array.array('i', [0]*10)
2113 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2114 self.assertEqual(conn.send_bytes(arr), None)
2115 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2116 len(arr) * buffer.itemsize)
2117 self.assertEqual(list(buffer), expected)
2118
2119 buffer = bytearray(latin(' ' * 40))
2120 self.assertEqual(conn.send_bytes(longmsg), None)
2121 try:
2122 res = conn.recv_bytes_into(buffer)
2123 except multiprocessing.BufferTooShort as e:
2124 self.assertEqual(e.args, (longmsg,))
2125 else:
2126 self.fail('expected BufferTooShort, got %s' % res)
2127
2128 poll = TimingWrapper(conn.poll)
2129
2130 self.assertEqual(poll(), False)
2131 self.assertTimingAlmostEqual(poll.elapsed, 0)
2132
Richard Oudkerk59d54042012-05-10 16:11:12 +01002133 self.assertEqual(poll(-1), False)
2134 self.assertTimingAlmostEqual(poll.elapsed, 0)
2135
Benjamin Petersone711caf2008-06-11 16:44:04 +00002136 self.assertEqual(poll(TIMEOUT1), False)
2137 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2138
2139 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002140 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002141
2142 self.assertEqual(poll(TIMEOUT1), True)
2143 self.assertTimingAlmostEqual(poll.elapsed, 0)
2144
2145 self.assertEqual(conn.recv(), None)
2146
2147 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2148 conn.send_bytes(really_big_msg)
2149 self.assertEqual(conn.recv_bytes(), really_big_msg)
2150
2151 conn.send_bytes(SENTINEL) # tell child to quit
2152 child_conn.close()
2153
2154 if self.TYPE == 'processes':
2155 self.assertEqual(conn.readable, True)
2156 self.assertEqual(conn.writable, True)
2157 self.assertRaises(EOFError, conn.recv)
2158 self.assertRaises(EOFError, conn.recv_bytes)
2159
2160 p.join()
2161
2162 def test_duplex_false(self):
2163 reader, writer = self.Pipe(duplex=False)
2164 self.assertEqual(writer.send(1), None)
2165 self.assertEqual(reader.recv(), 1)
2166 if self.TYPE == 'processes':
2167 self.assertEqual(reader.readable, True)
2168 self.assertEqual(reader.writable, False)
2169 self.assertEqual(writer.readable, False)
2170 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002171 self.assertRaises(OSError, reader.send, 2)
2172 self.assertRaises(OSError, writer.recv)
2173 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002174
2175 def test_spawn_close(self):
2176 # We test that a pipe connection can be closed by parent
2177 # process immediately after child is spawned. On Windows this
2178 # would have sometimes failed on old versions because
2179 # child_conn would be closed before the child got a chance to
2180 # duplicate it.
2181 conn, child_conn = self.Pipe()
2182
2183 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002184 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002185 p.start()
2186 child_conn.close() # this might complete before child initializes
2187
2188 msg = latin('hello')
2189 conn.send_bytes(msg)
2190 self.assertEqual(conn.recv_bytes(), msg)
2191
2192 conn.send_bytes(SENTINEL)
2193 conn.close()
2194 p.join()
2195
2196 def test_sendbytes(self):
2197 if self.TYPE != 'processes':
2198 return
2199
2200 msg = latin('abcdefghijklmnopqrstuvwxyz')
2201 a, b = self.Pipe()
2202
2203 a.send_bytes(msg)
2204 self.assertEqual(b.recv_bytes(), msg)
2205
2206 a.send_bytes(msg, 5)
2207 self.assertEqual(b.recv_bytes(), msg[5:])
2208
2209 a.send_bytes(msg, 7, 8)
2210 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2211
2212 a.send_bytes(msg, 26)
2213 self.assertEqual(b.recv_bytes(), latin(''))
2214
2215 a.send_bytes(msg, 26, 0)
2216 self.assertEqual(b.recv_bytes(), latin(''))
2217
2218 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2219
2220 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2221
2222 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2223
2224 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2225
2226 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2227
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002228 @classmethod
2229 def _is_fd_assigned(cls, fd):
2230 try:
2231 os.fstat(fd)
2232 except OSError as e:
2233 if e.errno == errno.EBADF:
2234 return False
2235 raise
2236 else:
2237 return True
2238
2239 @classmethod
2240 def _writefd(cls, conn, data, create_dummy_fds=False):
2241 if create_dummy_fds:
2242 for i in range(0, 256):
2243 if not cls._is_fd_assigned(i):
2244 os.dup2(conn.fileno(), i)
2245 fd = reduction.recv_handle(conn)
2246 if msvcrt:
2247 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2248 os.write(fd, data)
2249 os.close(fd)
2250
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002251 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002252 def test_fd_transfer(self):
2253 if self.TYPE != 'processes':
2254 self.skipTest("only makes sense with processes")
2255 conn, child_conn = self.Pipe(duplex=True)
2256
2257 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002258 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002259 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002260 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002261 with open(test.support.TESTFN, "wb") as f:
2262 fd = f.fileno()
2263 if msvcrt:
2264 fd = msvcrt.get_osfhandle(fd)
2265 reduction.send_handle(conn, fd, p.pid)
2266 p.join()
2267 with open(test.support.TESTFN, "rb") as f:
2268 self.assertEqual(f.read(), b"foo")
2269
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002270 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002271 @unittest.skipIf(sys.platform == "win32",
2272 "test semantics don't make sense on Windows")
2273 @unittest.skipIf(MAXFD <= 256,
2274 "largest assignable fd number is too small")
2275 @unittest.skipUnless(hasattr(os, "dup2"),
2276 "test needs os.dup2()")
2277 def test_large_fd_transfer(self):
2278 # With fd > 256 (issue #11657)
2279 if self.TYPE != 'processes':
2280 self.skipTest("only makes sense with processes")
2281 conn, child_conn = self.Pipe(duplex=True)
2282
2283 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002284 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002285 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002286 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002287 with open(test.support.TESTFN, "wb") as f:
2288 fd = f.fileno()
2289 for newfd in range(256, MAXFD):
2290 if not self._is_fd_assigned(newfd):
2291 break
2292 else:
2293 self.fail("could not find an unassigned large file descriptor")
2294 os.dup2(fd, newfd)
2295 try:
2296 reduction.send_handle(conn, newfd, p.pid)
2297 finally:
2298 os.close(newfd)
2299 p.join()
2300 with open(test.support.TESTFN, "rb") as f:
2301 self.assertEqual(f.read(), b"bar")
2302
Jesus Cea4507e642011-09-21 03:53:25 +02002303 @classmethod
2304 def _send_data_without_fd(self, conn):
2305 os.write(conn.fileno(), b"\0")
2306
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002307 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002308 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2309 def test_missing_fd_transfer(self):
2310 # Check that exception is raised when received data is not
2311 # accompanied by a file descriptor in ancillary data.
2312 if self.TYPE != 'processes':
2313 self.skipTest("only makes sense with processes")
2314 conn, child_conn = self.Pipe(duplex=True)
2315
2316 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2317 p.daemon = True
2318 p.start()
2319 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2320 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002321
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002322 def test_context(self):
2323 a, b = self.Pipe()
2324
2325 with a, b:
2326 a.send(1729)
2327 self.assertEqual(b.recv(), 1729)
2328 if self.TYPE == 'processes':
2329 self.assertFalse(a.closed)
2330 self.assertFalse(b.closed)
2331
2332 if self.TYPE == 'processes':
2333 self.assertTrue(a.closed)
2334 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002335 self.assertRaises(OSError, a.recv)
2336 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002337
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002338class _TestListener(BaseTestCase):
2339
Richard Oudkerk91257752012-06-15 21:53:34 +01002340 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002341
2342 def test_multiple_bind(self):
2343 for family in self.connection.families:
2344 l = self.connection.Listener(family=family)
2345 self.addCleanup(l.close)
2346 self.assertRaises(OSError, self.connection.Listener,
2347 l.address, family)
2348
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002349 def test_context(self):
2350 with self.connection.Listener() as l:
2351 with self.connection.Client(l.address) as c:
2352 with l.accept() as d:
2353 c.send(1729)
2354 self.assertEqual(d.recv(), 1729)
2355
2356 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002357 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002358
Benjamin Petersone711caf2008-06-11 16:44:04 +00002359class _TestListenerClient(BaseTestCase):
2360
2361 ALLOWED_TYPES = ('processes', 'threads')
2362
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002363 @classmethod
2364 def _test(cls, address):
2365 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002366 conn.send('hello')
2367 conn.close()
2368
2369 def test_listener_client(self):
2370 for family in self.connection.families:
2371 l = self.connection.Listener(family=family)
2372 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002373 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002374 p.start()
2375 conn = l.accept()
2376 self.assertEqual(conn.recv(), 'hello')
2377 p.join()
2378 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002379
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002380 def test_issue14725(self):
2381 l = self.connection.Listener()
2382 p = self.Process(target=self._test, args=(l.address,))
2383 p.daemon = True
2384 p.start()
2385 time.sleep(1)
2386 # On Windows the client process should by now have connected,
2387 # written data and closed the pipe handle by now. This causes
2388 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2389 # 14725.
2390 conn = l.accept()
2391 self.assertEqual(conn.recv(), 'hello')
2392 conn.close()
2393 p.join()
2394 l.close()
2395
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002396 def test_issue16955(self):
2397 for fam in self.connection.families:
2398 l = self.connection.Listener(family=fam)
2399 c = self.connection.Client(l.address)
2400 a = l.accept()
2401 a.send_bytes(b"hello")
2402 self.assertTrue(c.poll(1))
2403 a.close()
2404 c.close()
2405 l.close()
2406
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002407class _TestPoll(unittest.TestCase):
2408
2409 ALLOWED_TYPES = ('processes', 'threads')
2410
2411 def test_empty_string(self):
2412 a, b = self.Pipe()
2413 self.assertEqual(a.poll(), False)
2414 b.send_bytes(b'')
2415 self.assertEqual(a.poll(), True)
2416 self.assertEqual(a.poll(), True)
2417
2418 @classmethod
2419 def _child_strings(cls, conn, strings):
2420 for s in strings:
2421 time.sleep(0.1)
2422 conn.send_bytes(s)
2423 conn.close()
2424
2425 def test_strings(self):
2426 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2427 a, b = self.Pipe()
2428 p = self.Process(target=self._child_strings, args=(b, strings))
2429 p.start()
2430
2431 for s in strings:
2432 for i in range(200):
2433 if a.poll(0.01):
2434 break
2435 x = a.recv_bytes()
2436 self.assertEqual(s, x)
2437
2438 p.join()
2439
2440 @classmethod
2441 def _child_boundaries(cls, r):
2442 # Polling may "pull" a message in to the child process, but we
2443 # don't want it to pull only part of a message, as that would
2444 # corrupt the pipe for any other processes which might later
2445 # read from it.
2446 r.poll(5)
2447
2448 def test_boundaries(self):
2449 r, w = self.Pipe(False)
2450 p = self.Process(target=self._child_boundaries, args=(r,))
2451 p.start()
2452 time.sleep(2)
2453 L = [b"first", b"second"]
2454 for obj in L:
2455 w.send_bytes(obj)
2456 w.close()
2457 p.join()
2458 self.assertIn(r.recv_bytes(), L)
2459
2460 @classmethod
2461 def _child_dont_merge(cls, b):
2462 b.send_bytes(b'a')
2463 b.send_bytes(b'b')
2464 b.send_bytes(b'cd')
2465
2466 def test_dont_merge(self):
2467 a, b = self.Pipe()
2468 self.assertEqual(a.poll(0.0), False)
2469 self.assertEqual(a.poll(0.1), False)
2470
2471 p = self.Process(target=self._child_dont_merge, args=(b,))
2472 p.start()
2473
2474 self.assertEqual(a.recv_bytes(), b'a')
2475 self.assertEqual(a.poll(1.0), True)
2476 self.assertEqual(a.poll(1.0), True)
2477 self.assertEqual(a.recv_bytes(), b'b')
2478 self.assertEqual(a.poll(1.0), True)
2479 self.assertEqual(a.poll(1.0), True)
2480 self.assertEqual(a.poll(0.0), True)
2481 self.assertEqual(a.recv_bytes(), b'cd')
2482
2483 p.join()
2484
Benjamin Petersone711caf2008-06-11 16:44:04 +00002485#
2486# Test of sending connection and socket objects between processes
2487#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002488
2489@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002490class _TestPicklingConnections(BaseTestCase):
2491
2492 ALLOWED_TYPES = ('processes',)
2493
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002494 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002495 def tearDownClass(cls):
2496 from multiprocessing.reduction import resource_sharer
2497 resource_sharer.stop(timeout=5)
2498
2499 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002500 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002501 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002502 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002503 conn.send(l.address)
2504 new_conn = l.accept()
2505 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002506 new_conn.close()
2507 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002508
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002509 l = socket.socket()
2510 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002511 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002512 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002513 new_conn, addr = l.accept()
2514 conn.send(new_conn)
2515 new_conn.close()
2516 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002517
2518 conn.recv()
2519
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002520 @classmethod
2521 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002522 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002523 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002524 client.send(msg.upper())
2525 client.close()
2526
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002527 address, msg = conn.recv()
2528 client = socket.socket()
2529 client.connect(address)
2530 client.sendall(msg.upper())
2531 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002532
2533 conn.close()
2534
2535 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002536 families = self.connection.families
2537
2538 lconn, lconn0 = self.Pipe()
2539 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002540 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002541 lp.start()
2542 lconn0.close()
2543
2544 rconn, rconn0 = self.Pipe()
2545 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002546 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002547 rp.start()
2548 rconn0.close()
2549
2550 for fam in families:
2551 msg = ('This connection uses family %s' % fam).encode('ascii')
2552 address = lconn.recv()
2553 rconn.send((address, msg))
2554 new_conn = lconn.recv()
2555 self.assertEqual(new_conn.recv(), msg.upper())
2556
2557 rconn.send(None)
2558
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002559 msg = latin('This connection uses a normal socket')
2560 address = lconn.recv()
2561 rconn.send((address, msg))
2562 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002563 buf = []
2564 while True:
2565 s = new_conn.recv(100)
2566 if not s:
2567 break
2568 buf.append(s)
2569 buf = b''.join(buf)
2570 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002571 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002572
2573 lconn.send(None)
2574
2575 rconn.close()
2576 lconn.close()
2577
2578 lp.join()
2579 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002580
2581 @classmethod
2582 def child_access(cls, conn):
2583 w = conn.recv()
2584 w.send('all is well')
2585 w.close()
2586
2587 r = conn.recv()
2588 msg = r.recv()
2589 conn.send(msg*2)
2590
2591 conn.close()
2592
2593 def test_access(self):
2594 # On Windows, if we do not specify a destination pid when
2595 # using DupHandle then we need to be careful to use the
2596 # correct access flags for DuplicateHandle(), or else
2597 # DupHandle.detach() will raise PermissionError. For example,
2598 # for a read only pipe handle we should use
2599 # access=FILE_GENERIC_READ. (Unfortunately
2600 # DUPLICATE_SAME_ACCESS does not work.)
2601 conn, child_conn = self.Pipe()
2602 p = self.Process(target=self.child_access, args=(child_conn,))
2603 p.daemon = True
2604 p.start()
2605 child_conn.close()
2606
2607 r, w = self.Pipe(duplex=False)
2608 conn.send(w)
2609 w.close()
2610 self.assertEqual(r.recv(), 'all is well')
2611 r.close()
2612
2613 r, w = self.Pipe(duplex=False)
2614 conn.send(r)
2615 r.close()
2616 w.send('foobar')
2617 w.close()
2618 self.assertEqual(conn.recv(), 'foobar'*2)
2619
Benjamin Petersone711caf2008-06-11 16:44:04 +00002620#
2621#
2622#
2623
2624class _TestHeap(BaseTestCase):
2625
2626 ALLOWED_TYPES = ('processes',)
2627
2628 def test_heap(self):
2629 iterations = 5000
2630 maxblocks = 50
2631 blocks = []
2632
2633 # create and destroy lots of blocks of different sizes
2634 for i in range(iterations):
2635 size = int(random.lognormvariate(0, 1) * 1000)
2636 b = multiprocessing.heap.BufferWrapper(size)
2637 blocks.append(b)
2638 if len(blocks) > maxblocks:
2639 i = random.randrange(maxblocks)
2640 del blocks[i]
2641
2642 # get the heap object
2643 heap = multiprocessing.heap.BufferWrapper._heap
2644
2645 # verify the state of the heap
2646 all = []
2647 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002648 heap._lock.acquire()
2649 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002650 for L in list(heap._len_to_seq.values()):
2651 for arena, start, stop in L:
2652 all.append((heap._arenas.index(arena), start, stop,
2653 stop-start, 'free'))
2654 for arena, start, stop in heap._allocated_blocks:
2655 all.append((heap._arenas.index(arena), start, stop,
2656 stop-start, 'occupied'))
2657 occupied += (stop-start)
2658
2659 all.sort()
2660
2661 for i in range(len(all)-1):
2662 (arena, start, stop) = all[i][:3]
2663 (narena, nstart, nstop) = all[i+1][:3]
2664 self.assertTrue((arena != narena and nstart == 0) or
2665 (stop == nstart))
2666
Charles-François Natali778db492011-07-02 14:35:49 +02002667 def test_free_from_gc(self):
2668 # Check that freeing of blocks by the garbage collector doesn't deadlock
2669 # (issue #12352).
2670 # Make sure the GC is enabled, and set lower collection thresholds to
2671 # make collections more frequent (and increase the probability of
2672 # deadlock).
2673 if not gc.isenabled():
2674 gc.enable()
2675 self.addCleanup(gc.disable)
2676 thresholds = gc.get_threshold()
2677 self.addCleanup(gc.set_threshold, *thresholds)
2678 gc.set_threshold(10)
2679
2680 # perform numerous block allocations, with cyclic references to make
2681 # sure objects are collected asynchronously by the gc
2682 for i in range(5000):
2683 a = multiprocessing.heap.BufferWrapper(1)
2684 b = multiprocessing.heap.BufferWrapper(1)
2685 # circular references
2686 a.buddy = b
2687 b.buddy = a
2688
Benjamin Petersone711caf2008-06-11 16:44:04 +00002689#
2690#
2691#
2692
Benjamin Petersone711caf2008-06-11 16:44:04 +00002693class _Foo(Structure):
2694 _fields_ = [
2695 ('x', c_int),
2696 ('y', c_double)
2697 ]
2698
2699class _TestSharedCTypes(BaseTestCase):
2700
2701 ALLOWED_TYPES = ('processes',)
2702
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002703 def setUp(self):
2704 if not HAS_SHAREDCTYPES:
2705 self.skipTest("requires multiprocessing.sharedctypes")
2706
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002707 @classmethod
2708 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002709 x.value *= 2
2710 y.value *= 2
2711 foo.x *= 2
2712 foo.y *= 2
2713 string.value *= 2
2714 for i in range(len(arr)):
2715 arr[i] *= 2
2716
2717 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002718 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002719 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002720 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002721 arr = self.Array('d', list(range(10)), lock=lock)
2722 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002723 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002724
2725 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002726 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002727 p.start()
2728 p.join()
2729
2730 self.assertEqual(x.value, 14)
2731 self.assertAlmostEqual(y.value, 2.0/3.0)
2732 self.assertEqual(foo.x, 6)
2733 self.assertAlmostEqual(foo.y, 4.0)
2734 for i in range(10):
2735 self.assertAlmostEqual(arr[i], i*2)
2736 self.assertEqual(string.value, latin('hellohello'))
2737
2738 def test_synchronize(self):
2739 self.test_sharedctypes(lock=True)
2740
2741 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002742 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002743 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002744 foo.x = 0
2745 foo.y = 0
2746 self.assertEqual(bar.x, 2)
2747 self.assertAlmostEqual(bar.y, 5.0)
2748
2749#
2750#
2751#
2752
2753class _TestFinalize(BaseTestCase):
2754
2755 ALLOWED_TYPES = ('processes',)
2756
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002757 @classmethod
2758 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002759 class Foo(object):
2760 pass
2761
2762 a = Foo()
2763 util.Finalize(a, conn.send, args=('a',))
2764 del a # triggers callback for a
2765
2766 b = Foo()
2767 close_b = util.Finalize(b, conn.send, args=('b',))
2768 close_b() # triggers callback for b
2769 close_b() # does nothing because callback has already been called
2770 del b # does nothing because callback has already been called
2771
2772 c = Foo()
2773 util.Finalize(c, conn.send, args=('c',))
2774
2775 d10 = Foo()
2776 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2777
2778 d01 = Foo()
2779 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2780 d02 = Foo()
2781 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2782 d03 = Foo()
2783 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2784
2785 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2786
2787 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2788
Ezio Melotti13925002011-03-16 11:05:33 +02002789 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002790 # garbage collecting locals
2791 util._exit_function()
2792 conn.close()
2793 os._exit(0)
2794
2795 def test_finalize(self):
2796 conn, child_conn = self.Pipe()
2797
2798 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002799 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002800 p.start()
2801 p.join()
2802
2803 result = [obj for obj in iter(conn.recv, 'STOP')]
2804 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2805
2806#
2807# Test that from ... import * works for each module
2808#
2809
2810class _TestImportStar(BaseTestCase):
2811
2812 ALLOWED_TYPES = ('processes',)
2813
2814 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002815 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002816 'multiprocessing', 'multiprocessing.connection',
2817 'multiprocessing.heap', 'multiprocessing.managers',
2818 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002819 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002820 ]
2821
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002822 if HAS_REDUCTION:
2823 modules.append('multiprocessing.reduction')
2824
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002825 if c_int is not None:
2826 # This module requires _ctypes
2827 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002828
2829 for name in modules:
2830 __import__(name)
2831 mod = sys.modules[name]
2832
2833 for attr in getattr(mod, '__all__', ()):
2834 self.assertTrue(
2835 hasattr(mod, attr),
2836 '%r does not have attribute %r' % (mod, attr)
2837 )
2838
2839#
2840# Quick test that logging works -- does not test logging output
2841#
2842
2843class _TestLogging(BaseTestCase):
2844
2845 ALLOWED_TYPES = ('processes',)
2846
2847 def test_enable_logging(self):
2848 logger = multiprocessing.get_logger()
2849 logger.setLevel(util.SUBWARNING)
2850 self.assertTrue(logger is not None)
2851 logger.debug('this will not be printed')
2852 logger.info('nor will this')
2853 logger.setLevel(LOG_LEVEL)
2854
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002855 @classmethod
2856 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002857 logger = multiprocessing.get_logger()
2858 conn.send(logger.getEffectiveLevel())
2859
2860 def test_level(self):
2861 LEVEL1 = 32
2862 LEVEL2 = 37
2863
2864 logger = multiprocessing.get_logger()
2865 root_logger = logging.getLogger()
2866 root_level = root_logger.level
2867
2868 reader, writer = multiprocessing.Pipe(duplex=False)
2869
2870 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002871 p = self.Process(target=self._test_level, args=(writer,))
2872 p.daemon = True
2873 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002874 self.assertEqual(LEVEL1, reader.recv())
2875
2876 logger.setLevel(logging.NOTSET)
2877 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002878 p = self.Process(target=self._test_level, args=(writer,))
2879 p.daemon = True
2880 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002881 self.assertEqual(LEVEL2, reader.recv())
2882
2883 root_logger.setLevel(root_level)
2884 logger.setLevel(level=LOG_LEVEL)
2885
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002886
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002887# class _TestLoggingProcessName(BaseTestCase):
2888#
2889# def handle(self, record):
2890# assert record.processName == multiprocessing.current_process().name
2891# self.__handled = True
2892#
2893# def test_logging(self):
2894# handler = logging.Handler()
2895# handler.handle = self.handle
2896# self.__handled = False
2897# # Bypass getLogger() and side-effects
2898# logger = logging.getLoggerClass()(
2899# 'multiprocessing.test.TestLoggingProcessName')
2900# logger.addHandler(handler)
2901# logger.propagate = False
2902#
2903# logger.warn('foo')
2904# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002905
Benjamin Petersone711caf2008-06-11 16:44:04 +00002906#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002907# Check that Process.join() retries if os.waitpid() fails with EINTR
2908#
2909
2910class _TestPollEintr(BaseTestCase):
2911
2912 ALLOWED_TYPES = ('processes',)
2913
2914 @classmethod
2915 def _killer(cls, pid):
2916 time.sleep(0.5)
2917 os.kill(pid, signal.SIGUSR1)
2918
2919 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2920 def test_poll_eintr(self):
2921 got_signal = [False]
2922 def record(*args):
2923 got_signal[0] = True
2924 pid = os.getpid()
2925 oldhandler = signal.signal(signal.SIGUSR1, record)
2926 try:
2927 killer = self.Process(target=self._killer, args=(pid,))
2928 killer.start()
2929 p = self.Process(target=time.sleep, args=(1,))
2930 p.start()
2931 p.join()
2932 self.assertTrue(got_signal[0])
2933 self.assertEqual(p.exitcode, 0)
2934 killer.join()
2935 finally:
2936 signal.signal(signal.SIGUSR1, oldhandler)
2937
2938#
Jesse Noller6214edd2009-01-19 16:23:53 +00002939# Test to verify handle verification, see issue 3321
2940#
2941
2942class TestInvalidHandle(unittest.TestCase):
2943
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002944 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002945 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002946 conn = multiprocessing.connection.Connection(44977608)
2947 try:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002948 self.assertRaises((ValueError, OSError), conn.poll)
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002949 finally:
2950 # Hack private attribute _handle to avoid printing an error
2951 # in conn.__del__
2952 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002953 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002954 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002955
Jesse Noller6214edd2009-01-19 16:23:53 +00002956#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002957# Functions used to create test cases from the base ones in this module
2958#
2959
Benjamin Petersone711caf2008-06-11 16:44:04 +00002960def create_test_cases(Mixin, type):
2961 result = {}
2962 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002963 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002964 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002965
2966 for name in list(glob.keys()):
2967 if name.startswith('_Test'):
2968 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002969 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002970 if type in base.ALLOWED_TYPES:
2971 newname = 'With' + Type + name[1:]
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002972 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002973 pass
2974 result[newname] = Temp
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002975 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002976 Temp.__module__ = Mixin.__module__
2977 return result
2978
2979#
2980# Create test cases
2981#
2982
2983class ProcessesMixin(object):
2984 TYPE = 'processes'
2985 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002986 connection = multiprocessing.connection
2987 current_process = staticmethod(multiprocessing.current_process)
2988 active_children = staticmethod(multiprocessing.active_children)
2989 Pool = staticmethod(multiprocessing.Pool)
2990 Pipe = staticmethod(multiprocessing.Pipe)
2991 Queue = staticmethod(multiprocessing.Queue)
2992 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2993 Lock = staticmethod(multiprocessing.Lock)
2994 RLock = staticmethod(multiprocessing.RLock)
2995 Semaphore = staticmethod(multiprocessing.Semaphore)
2996 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2997 Condition = staticmethod(multiprocessing.Condition)
2998 Event = staticmethod(multiprocessing.Event)
2999 Barrier = staticmethod(multiprocessing.Barrier)
3000 Value = staticmethod(multiprocessing.Value)
3001 Array = staticmethod(multiprocessing.Array)
3002 RawValue = staticmethod(multiprocessing.RawValue)
3003 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003004
3005testcases_processes = create_test_cases(ProcessesMixin, type='processes')
3006globals().update(testcases_processes)
3007
3008
3009class ManagerMixin(object):
3010 TYPE = 'manager'
3011 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003012 Queue = property(operator.attrgetter('manager.Queue'))
3013 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3014 Lock = property(operator.attrgetter('manager.Lock'))
3015 RLock = property(operator.attrgetter('manager.RLock'))
3016 Semaphore = property(operator.attrgetter('manager.Semaphore'))
3017 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3018 Condition = property(operator.attrgetter('manager.Condition'))
3019 Event = property(operator.attrgetter('manager.Event'))
3020 Barrier = property(operator.attrgetter('manager.Barrier'))
3021 Value = property(operator.attrgetter('manager.Value'))
3022 Array = property(operator.attrgetter('manager.Array'))
3023 list = property(operator.attrgetter('manager.list'))
3024 dict = property(operator.attrgetter('manager.dict'))
3025 Namespace = property(operator.attrgetter('manager.Namespace'))
3026
3027 @classmethod
3028 def Pool(cls, *args, **kwds):
3029 return cls.manager.Pool(*args, **kwds)
3030
3031 @classmethod
3032 def setUpClass(cls):
3033 cls.manager = multiprocessing.Manager()
3034
3035 @classmethod
3036 def tearDownClass(cls):
Ezio Melottidc6763b2013-03-11 21:39:18 +02003037 # only the manager process should be returned by active_children()
3038 # but this can take a bit on slow machines, so wait a few seconds
3039 # if there are other children too (see #17395)
3040 t = 0.01
3041 while len(multiprocessing.active_children()) > 1 and t < 5:
3042 time.sleep(t)
3043 t *= 2
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003044 gc.collect() # do garbage collection
3045 if cls.manager._number_of_objects() != 0:
3046 # This is not really an error since some tests do not
3047 # ensure that all processes which hold a reference to a
3048 # managed object have been joined.
3049 print('Shared objects which still exist at manager shutdown:')
3050 print(cls.manager._debug_info())
3051 cls.manager.shutdown()
3052 cls.manager.join()
3053 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00003054
3055testcases_manager = create_test_cases(ManagerMixin, type='manager')
3056globals().update(testcases_manager)
3057
3058
3059class ThreadsMixin(object):
3060 TYPE = 'threads'
3061 Process = multiprocessing.dummy.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003062 connection = multiprocessing.dummy.connection
3063 current_process = staticmethod(multiprocessing.dummy.current_process)
3064 active_children = staticmethod(multiprocessing.dummy.active_children)
3065 Pool = staticmethod(multiprocessing.Pool)
3066 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3067 Queue = staticmethod(multiprocessing.dummy.Queue)
3068 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3069 Lock = staticmethod(multiprocessing.dummy.Lock)
3070 RLock = staticmethod(multiprocessing.dummy.RLock)
3071 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3072 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3073 Condition = staticmethod(multiprocessing.dummy.Condition)
3074 Event = staticmethod(multiprocessing.dummy.Event)
3075 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3076 Value = staticmethod(multiprocessing.dummy.Value)
3077 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003078
3079testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3080globals().update(testcases_threads)
3081
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003082
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003083class OtherTest(unittest.TestCase):
3084 # TODO: add more tests for deliver/answer challenge.
3085 def test_deliver_challenge_auth_failure(self):
3086 class _FakeConnection(object):
3087 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003088 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003089 def send_bytes(self, data):
3090 pass
3091 self.assertRaises(multiprocessing.AuthenticationError,
3092 multiprocessing.connection.deliver_challenge,
3093 _FakeConnection(), b'abc')
3094
3095 def test_answer_challenge_auth_failure(self):
3096 class _FakeConnection(object):
3097 def __init__(self):
3098 self.count = 0
3099 def recv_bytes(self, size):
3100 self.count += 1
3101 if self.count == 1:
3102 return multiprocessing.connection.CHALLENGE
3103 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003104 return b'something bogus'
3105 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003106 def send_bytes(self, data):
3107 pass
3108 self.assertRaises(multiprocessing.AuthenticationError,
3109 multiprocessing.connection.answer_challenge,
3110 _FakeConnection(), b'abc')
3111
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003112#
3113# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3114#
3115
3116def initializer(ns):
3117 ns.test += 1
3118
3119class TestInitializers(unittest.TestCase):
3120 def setUp(self):
3121 self.mgr = multiprocessing.Manager()
3122 self.ns = self.mgr.Namespace()
3123 self.ns.test = 0
3124
3125 def tearDown(self):
3126 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003127 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003128
3129 def test_manager_initializer(self):
3130 m = multiprocessing.managers.SyncManager()
3131 self.assertRaises(TypeError, m.start, 1)
3132 m.start(initializer, (self.ns,))
3133 self.assertEqual(self.ns.test, 1)
3134 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003135 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003136
3137 def test_pool_initializer(self):
3138 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3139 p = multiprocessing.Pool(1, initializer, (self.ns,))
3140 p.close()
3141 p.join()
3142 self.assertEqual(self.ns.test, 1)
3143
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003144#
3145# Issue 5155, 5313, 5331: Test process in processes
3146# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3147#
3148
3149def _ThisSubProcess(q):
3150 try:
3151 item = q.get(block=False)
3152 except pyqueue.Empty:
3153 pass
3154
3155def _TestProcess(q):
3156 queue = multiprocessing.Queue()
3157 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003158 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003159 subProc.start()
3160 subProc.join()
3161
3162def _afunc(x):
3163 return x*x
3164
3165def pool_in_process():
3166 pool = multiprocessing.Pool(processes=4)
3167 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003168 pool.close()
3169 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003170
3171class _file_like(object):
3172 def __init__(self, delegate):
3173 self._delegate = delegate
3174 self._pid = None
3175
3176 @property
3177 def cache(self):
3178 pid = os.getpid()
3179 # There are no race conditions since fork keeps only the running thread
3180 if pid != self._pid:
3181 self._pid = pid
3182 self._cache = []
3183 return self._cache
3184
3185 def write(self, data):
3186 self.cache.append(data)
3187
3188 def flush(self):
3189 self._delegate.write(''.join(self.cache))
3190 self._cache = []
3191
3192class TestStdinBadfiledescriptor(unittest.TestCase):
3193
3194 def test_queue_in_process(self):
3195 queue = multiprocessing.Queue()
3196 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3197 proc.start()
3198 proc.join()
3199
3200 def test_pool_in_process(self):
3201 p = multiprocessing.Process(target=pool_in_process)
3202 p.start()
3203 p.join()
3204
3205 def test_flushing(self):
3206 sio = io.StringIO()
3207 flike = _file_like(sio)
3208 flike.write('foo')
3209 proc = multiprocessing.Process(target=lambda: flike.flush())
3210 flike.flush()
3211 assert sio.getvalue() == 'foo'
3212
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003213
3214class TestWait(unittest.TestCase):
3215
3216 @classmethod
3217 def _child_test_wait(cls, w, slow):
3218 for i in range(10):
3219 if slow:
3220 time.sleep(random.random()*0.1)
3221 w.send((i, os.getpid()))
3222 w.close()
3223
3224 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003225 from multiprocessing.connection import wait
3226 readers = []
3227 procs = []
3228 messages = []
3229
3230 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003231 r, w = multiprocessing.Pipe(duplex=False)
3232 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003233 p.daemon = True
3234 p.start()
3235 w.close()
3236 readers.append(r)
3237 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003238 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003239
3240 while readers:
3241 for r in wait(readers):
3242 try:
3243 msg = r.recv()
3244 except EOFError:
3245 readers.remove(r)
3246 r.close()
3247 else:
3248 messages.append(msg)
3249
3250 messages.sort()
3251 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3252 self.assertEqual(messages, expected)
3253
3254 @classmethod
3255 def _child_test_wait_socket(cls, address, slow):
3256 s = socket.socket()
3257 s.connect(address)
3258 for i in range(10):
3259 if slow:
3260 time.sleep(random.random()*0.1)
3261 s.sendall(('%s\n' % i).encode('ascii'))
3262 s.close()
3263
3264 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003265 from multiprocessing.connection import wait
3266 l = socket.socket()
3267 l.bind(('', 0))
3268 l.listen(4)
3269 addr = ('localhost', l.getsockname()[1])
3270 readers = []
3271 procs = []
3272 dic = {}
3273
3274 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003275 p = multiprocessing.Process(target=self._child_test_wait_socket,
3276 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003277 p.daemon = True
3278 p.start()
3279 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003280 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003281
3282 for i in range(4):
3283 r, _ = l.accept()
3284 readers.append(r)
3285 dic[r] = []
3286 l.close()
3287
3288 while readers:
3289 for r in wait(readers):
3290 msg = r.recv(32)
3291 if not msg:
3292 readers.remove(r)
3293 r.close()
3294 else:
3295 dic[r].append(msg)
3296
3297 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3298 for v in dic.values():
3299 self.assertEqual(b''.join(v), expected)
3300
3301 def test_wait_slow(self):
3302 self.test_wait(True)
3303
3304 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003305 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003306
3307 def test_wait_timeout(self):
3308 from multiprocessing.connection import wait
3309
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003310 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003311 a, b = multiprocessing.Pipe()
3312
3313 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003314 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003315 delta = time.time() - start
3316
3317 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003318 self.assertLess(delta, expected * 2)
3319 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003320
3321 b.send(None)
3322
3323 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003324 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003325 delta = time.time() - start
3326
3327 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003328 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003329
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003330 @classmethod
3331 def signal_and_sleep(cls, sem, period):
3332 sem.release()
3333 time.sleep(period)
3334
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003335 def test_wait_integer(self):
3336 from multiprocessing.connection import wait
3337
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003338 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01003339 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003340 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003341 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003342 p = multiprocessing.Process(target=self.signal_and_sleep,
3343 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003344
3345 p.start()
3346 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003347 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003348
3349 start = time.time()
3350 res = wait([a, p.sentinel, b], expected + 20)
3351 delta = time.time() - start
3352
3353 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003354 self.assertLess(delta, expected + 2)
3355 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003356
3357 a.send(None)
3358
3359 start = time.time()
3360 res = wait([a, p.sentinel, b], 20)
3361 delta = time.time() - start
3362
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003363 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003364 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003365
3366 b.send(None)
3367
3368 start = time.time()
3369 res = wait([a, p.sentinel, b], 20)
3370 delta = time.time() - start
3371
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003372 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003373 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003374
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003375 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003376 p.join()
3377
Richard Oudkerk59d54042012-05-10 16:11:12 +01003378 def test_neg_timeout(self):
3379 from multiprocessing.connection import wait
3380 a, b = multiprocessing.Pipe()
3381 t = time.time()
3382 res = wait([a], timeout=-1)
3383 t = time.time() - t
3384 self.assertEqual(res, [])
3385 self.assertLess(t, 1)
3386 a.close()
3387 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003388
Antoine Pitrou709176f2012-04-01 17:19:09 +02003389#
3390# Issue 14151: Test invalid family on invalid environment
3391#
3392
3393class TestInvalidFamily(unittest.TestCase):
3394
3395 @unittest.skipIf(WIN32, "skipped on Windows")
3396 def test_invalid_family(self):
3397 with self.assertRaises(ValueError):
3398 multiprocessing.connection.Listener(r'\\.\test')
3399
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003400 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3401 def test_invalid_family_win32(self):
3402 with self.assertRaises(ValueError):
3403 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003404
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003405#
3406# Issue 12098: check sys.flags of child matches that for parent
3407#
3408
3409class TestFlags(unittest.TestCase):
3410 @classmethod
3411 def run_in_grandchild(cls, conn):
3412 conn.send(tuple(sys.flags))
3413
3414 @classmethod
3415 def run_in_child(cls):
3416 import json
3417 r, w = multiprocessing.Pipe(duplex=False)
3418 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3419 p.start()
3420 grandchild_flags = r.recv()
3421 p.join()
3422 r.close()
3423 w.close()
3424 flags = (tuple(sys.flags), grandchild_flags)
3425 print(json.dumps(flags))
3426
3427 def test_flags(self):
3428 import json, subprocess
3429 # start child process using unusual flags
3430 prog = ('from test.test_multiprocessing import TestFlags; ' +
3431 'TestFlags.run_in_child()')
3432 data = subprocess.check_output(
3433 [sys.executable, '-E', '-S', '-O', '-c', prog])
3434 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3435 self.assertEqual(child_flags, grandchild_flags)
3436
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003437#
3438# Test interaction with socket timeouts - see Issue #6056
3439#
3440
3441class TestTimeouts(unittest.TestCase):
3442 @classmethod
3443 def _test_timeout(cls, child, address):
3444 time.sleep(1)
3445 child.send(123)
3446 child.close()
3447 conn = multiprocessing.connection.Client(address)
3448 conn.send(456)
3449 conn.close()
3450
3451 def test_timeout(self):
3452 old_timeout = socket.getdefaulttimeout()
3453 try:
3454 socket.setdefaulttimeout(0.1)
3455 parent, child = multiprocessing.Pipe(duplex=True)
3456 l = multiprocessing.connection.Listener(family='AF_INET')
3457 p = multiprocessing.Process(target=self._test_timeout,
3458 args=(child, l.address))
3459 p.start()
3460 child.close()
3461 self.assertEqual(parent.recv(), 123)
3462 parent.close()
3463 conn = l.accept()
3464 self.assertEqual(conn.recv(), 456)
3465 conn.close()
3466 l.close()
3467 p.join(10)
3468 finally:
3469 socket.setdefaulttimeout(old_timeout)
3470
Richard Oudkerke88a2442012-08-14 11:41:32 +01003471#
3472# Test what happens with no "if __name__ == '__main__'"
3473#
3474
3475class TestNoForkBomb(unittest.TestCase):
3476 def test_noforkbomb(self):
3477 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3478 if WIN32:
3479 rc, out, err = test.script_helper.assert_python_failure(name)
3480 self.assertEqual('', out.decode('ascii'))
3481 self.assertIn('RuntimeError', err.decode('ascii'))
3482 else:
3483 rc, out, err = test.script_helper.assert_python_ok(name)
3484 self.assertEqual('123', out.decode('ascii').rstrip())
3485 self.assertEqual('', err.decode('ascii'))
3486
3487#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003488# Issue #17555: ForkAwareThreadLock
3489#
3490
3491class TestForkAwareThreadLock(unittest.TestCase):
3492 # We recurisvely start processes. Issue #17555 meant that the
3493 # after fork registry would get duplicate entries for the same
3494 # lock. The size of the registry at generation n was ~2**n.
3495
3496 @classmethod
3497 def child(cls, n, conn):
3498 if n > 1:
3499 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3500 p.start()
3501 p.join()
3502 else:
3503 conn.send(len(util._afterfork_registry))
3504 conn.close()
3505
3506 def test_lock(self):
3507 r, w = multiprocessing.Pipe(False)
3508 l = util.ForkAwareThreadLock()
3509 old_size = len(util._afterfork_registry)
3510 p = multiprocessing.Process(target=self.child, args=(5, w))
3511 p.start()
3512 new_size = r.recv()
3513 p.join()
3514 self.assertLessEqual(new_size, old_size)
3515
3516#
Richard Oudkerke88a2442012-08-14 11:41:32 +01003517#
3518#
3519
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003520testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003521 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
Richard Oudkerk409c3132013-04-17 20:58:00 +01003522 TestFlags, TestTimeouts, TestNoForkBomb,
3523 TestForkAwareThreadLock]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003524
Benjamin Petersone711caf2008-06-11 16:44:04 +00003525#
3526#
3527#
3528
3529def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003530 if sys.platform.startswith("linux"):
3531 try:
3532 lock = multiprocessing.RLock()
3533 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003534 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003535
Charles-François Natali221ef672011-11-22 18:55:22 +01003536 check_enough_semaphores()
3537
Benjamin Petersone711caf2008-06-11 16:44:04 +00003538 if run is None:
3539 from test.support import run_unittest as run
3540
3541 util.get_temp_dir() # creates temp directory for use by all processes
3542
3543 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3544
Benjamin Petersone711caf2008-06-11 16:44:04 +00003545 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003546 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3547 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003548 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3549 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003550 )
3551
3552 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3553 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003554 run(suite)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003555
3556def main():
3557 test_main(unittest.TextTestRunner(verbosity=2).run)
3558
3559if __name__ == '__main__':
3560 main()