blob: 6cda4fa3c808e9843cc546d1d4ebdf12be430036 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010022import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000023import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010024import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000031# import threading after _multiprocessing to raise a more revelant error
32# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.dummy
37import multiprocessing.connection
38import multiprocessing.managers
39import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
46 HAS_REDUCTION = True
47except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000065def latin(s):
66 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
69# Constants
70#
71
72LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000073#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
75DELTA = 0.1
76CHECK_TIMINGS = False # making true makes tests take a lot longer
77 # and can sometimes cause some non-serious
78 # failures because some calls block a bit
79 # longer than expected
80if CHECK_TIMINGS:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82else:
83 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84
85HAVE_GETVALUE = not getattr(_multiprocessing,
86 'HAVE_BROKEN_SEM_GETVALUE', False)
87
Jesse Noller6214edd2009-01-19 16:23:53 +000088WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020091
Richard Oudkerk59d54042012-05-10 16:11:12 +010092def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
Charles-François Natali221ef672011-11-22 18:55:22 +0100112
113def check_enough_semaphores():
114 """Check that the system supports enough semaphores to run the test."""
115 # minimum number of semaphores available according to POSIX
116 nsems_min = 256
117 try:
118 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119 except (AttributeError, ValueError):
120 # sysconf not available or setting not available
121 return
122 if nsems == -1 or nsems >= nsems_min:
123 return
124 raise unittest.SkipTest("The OS doesn't support enough semaphores "
125 "to run the test (required: %d)." % nsems_min)
126
127
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129# Creates a wrapper for a function which records the time it takes to finish
130#
131
132class TimingWrapper(object):
133
134 def __init__(self, func):
135 self.func = func
136 self.elapsed = None
137
138 def __call__(self, *args, **kwds):
139 t = time.time()
140 try:
141 return self.func(*args, **kwds)
142 finally:
143 self.elapsed = time.time() - t
144
145#
146# Base class for test cases
147#
148
149class BaseTestCase(object):
150
151 ALLOWED_TYPES = ('processes', 'manager', 'threads')
152
153 def assertTimingAlmostEqual(self, a, b):
154 if CHECK_TIMINGS:
155 self.assertAlmostEqual(a, b, 1)
156
157 def assertReturnsIfImplemented(self, value, func, *args):
158 try:
159 res = func(*args)
160 except NotImplementedError:
161 pass
162 else:
163 return self.assertEqual(value, res)
164
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000165 # For the sanity of Windows users, rather than crashing or freezing in
166 # multiple ways.
167 def __reduce__(self, *args):
168 raise NotImplementedError("shouldn't try to pickle a test case")
169
170 __reduce_ex__ = __reduce__
171
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172#
173# Return the value of a semaphore
174#
175
176def get_value(self):
177 try:
178 return self.get_value()
179 except AttributeError:
180 try:
181 return self._Semaphore__value
182 except AttributeError:
183 try:
184 return self._value
185 except AttributeError:
186 raise NotImplementedError
187
188#
189# Testcases
190#
191
192class _TestProcess(BaseTestCase):
193
194 ALLOWED_TYPES = ('processes', 'threads')
195
196 def test_current(self):
197 if self.TYPE == 'threads':
198 return
199
200 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000205 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(current.ident, os.getpid())
208 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000210 def test_daemon_argument(self):
211 if self.TYPE == "threads":
212 return
213
214 # By default uses the current process's daemon flag.
215 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000216 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 proc1 = self.Process(target=self._test, daemon=True)
218 self.assertTrue(proc1.daemon)
219 proc2 = self.Process(target=self._test, daemon=False)
220 self.assertFalse(proc2.daemon)
221
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000222 @classmethod
223 def _test(cls, q, *args, **kwds):
224 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 q.put(args)
226 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 q.put(current.pid)
231
232 def test_process(self):
233 q = self.Queue(1)
234 e = self.Event()
235 args = (q, 1, 2)
236 kwargs = {'hello':23, 'bye':2.54}
237 name = 'SomeProcess'
238 p = self.Process(
239 target=self._test, args=args, kwargs=kwargs, name=name
240 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 current = self.current_process()
243
244 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.authkey, current.authkey)
246 self.assertEqual(p.is_alive(), False)
247 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000250 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.start()
253
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(p.exitcode, None)
255 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000256 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(q.get(), args[1:])
259 self.assertEqual(q.get(), kwargs)
260 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(q.get(), current.authkey)
263 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.join()
266
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(p.exitcode, 0)
268 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 @classmethod
272 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 time.sleep(1000)
274
275 def test_terminate(self):
276 if self.TYPE == 'threads':
277 return
278
279 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.start()
282
283 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000285 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286
Richard Oudkerk59d54042012-05-10 16:11:12 +0100287 join = TimingWrapper(p.join)
288
289 self.assertEqual(join(0), None)
290 self.assertTimingAlmostEqual(join.elapsed, 0.0)
291 self.assertEqual(p.is_alive(), True)
292
293 self.assertEqual(join(-1), None)
294 self.assertTimingAlmostEqual(join.elapsed, 0.0)
295 self.assertEqual(p.is_alive(), True)
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 p.terminate()
298
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 self.assertEqual(join(), None)
300 self.assertTimingAlmostEqual(join.elapsed, 0.0)
301
302 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
305 p.join()
306
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000307 # XXX sometimes get p.exitcode == 0 on Windows ...
308 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309
310 def test_cpu_count(self):
311 try:
312 cpus = multiprocessing.cpu_count()
313 except NotImplementedError:
314 cpus = 1
315 self.assertTrue(type(cpus) is int)
316 self.assertTrue(cpus >= 1)
317
318 def test_active_children(self):
319 self.assertEqual(type(self.active_children()), list)
320
321 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000322 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323
Jesus Cea94f964f2011-09-09 20:26:57 +0200324 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000329 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000331 @classmethod
332 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 from multiprocessing import forking
334 wconn.send(id)
335 if len(id) < 2:
336 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000337 p = cls.Process(
338 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339 )
340 p.start()
341 p.join()
342
343 def test_recursion(self):
344 rconn, wconn = self.Pipe(duplex=False)
345 self._test_recursion(wconn, [])
346
347 time.sleep(DELTA)
348 result = []
349 while rconn.poll():
350 result.append(rconn.recv())
351
352 expected = [
353 [],
354 [0],
355 [0, 0],
356 [0, 1],
357 [1],
358 [1, 0],
359 [1, 1]
360 ]
361 self.assertEqual(result, expected)
362
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200363 @classmethod
364 def _test_sentinel(cls, event):
365 event.wait(10.0)
366
367 def test_sentinel(self):
368 if self.TYPE == "threads":
369 return
370 event = self.Event()
371 p = self.Process(target=self._test_sentinel, args=(event,))
372 with self.assertRaises(ValueError):
373 p.sentinel
374 p.start()
375 self.addCleanup(p.join)
376 sentinel = p.sentinel
377 self.assertIsInstance(sentinel, int)
378 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
379 event.set()
380 p.join()
381 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
382
Benjamin Petersone711caf2008-06-11 16:44:04 +0000383#
384#
385#
386
387class _UpperCaser(multiprocessing.Process):
388
389 def __init__(self):
390 multiprocessing.Process.__init__(self)
391 self.child_conn, self.parent_conn = multiprocessing.Pipe()
392
393 def run(self):
394 self.parent_conn.close()
395 for s in iter(self.child_conn.recv, None):
396 self.child_conn.send(s.upper())
397 self.child_conn.close()
398
399 def submit(self, s):
400 assert type(s) is str
401 self.parent_conn.send(s)
402 return self.parent_conn.recv()
403
404 def stop(self):
405 self.parent_conn.send(None)
406 self.parent_conn.close()
407 self.child_conn.close()
408
409class _TestSubclassingProcess(BaseTestCase):
410
411 ALLOWED_TYPES = ('processes',)
412
413 def test_subclassing(self):
414 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200415 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000416 uppercaser.start()
417 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
418 self.assertEqual(uppercaser.submit('world'), 'WORLD')
419 uppercaser.stop()
420 uppercaser.join()
421
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100422 def test_stderr_flush(self):
423 # sys.stderr is flushed at process shutdown (issue #13812)
424 if self.TYPE == "threads":
425 return
426
427 testfn = test.support.TESTFN
428 self.addCleanup(test.support.unlink, testfn)
429 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
430 proc.start()
431 proc.join()
432 with open(testfn, 'r') as f:
433 err = f.read()
434 # The whole traceback was printed
435 self.assertIn("ZeroDivisionError", err)
436 self.assertIn("test_multiprocessing.py", err)
437 self.assertIn("1/0 # MARKER", err)
438
439 @classmethod
440 def _test_stderr_flush(cls, testfn):
441 sys.stderr = open(testfn, 'w')
442 1/0 # MARKER
443
444
Richard Oudkerk29471de2012-06-06 19:04:57 +0100445 @classmethod
446 def _test_sys_exit(cls, reason, testfn):
447 sys.stderr = open(testfn, 'w')
448 sys.exit(reason)
449
450 def test_sys_exit(self):
451 # See Issue 13854
452 if self.TYPE == 'threads':
453 return
454
455 testfn = test.support.TESTFN
456 self.addCleanup(test.support.unlink, testfn)
457
458 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
459 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
460 p.daemon = True
461 p.start()
462 p.join(5)
463 self.assertEqual(p.exitcode, code)
464
465 with open(testfn, 'r') as f:
466 self.assertEqual(f.read().rstrip(), str(reason))
467
468 for reason in (True, False, 8):
469 p = self.Process(target=sys.exit, args=(reason,))
470 p.daemon = True
471 p.start()
472 p.join(5)
473 self.assertEqual(p.exitcode, reason)
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475#
476#
477#
478
479def queue_empty(q):
480 if hasattr(q, 'empty'):
481 return q.empty()
482 else:
483 return q.qsize() == 0
484
485def queue_full(q, maxsize):
486 if hasattr(q, 'full'):
487 return q.full()
488 else:
489 return q.qsize() == maxsize
490
491
492class _TestQueue(BaseTestCase):
493
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 child_can_start.wait()
498 for i in range(6):
499 queue.get()
500 parent_can_continue.set()
501
502 def test_put(self):
503 MAXSIZE = 6
504 queue = self.Queue(maxsize=MAXSIZE)
505 child_can_start = self.Event()
506 parent_can_continue = self.Event()
507
508 proc = self.Process(
509 target=self._test_put,
510 args=(queue, child_can_start, parent_can_continue)
511 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000512 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 proc.start()
514
515 self.assertEqual(queue_empty(queue), True)
516 self.assertEqual(queue_full(queue, MAXSIZE), False)
517
518 queue.put(1)
519 queue.put(2, True)
520 queue.put(3, True, None)
521 queue.put(4, False)
522 queue.put(5, False, None)
523 queue.put_nowait(6)
524
525 # the values may be in buffer but not yet in pipe so sleep a bit
526 time.sleep(DELTA)
527
528 self.assertEqual(queue_empty(queue), False)
529 self.assertEqual(queue_full(queue, MAXSIZE), True)
530
531 put = TimingWrapper(queue.put)
532 put_nowait = TimingWrapper(queue.put_nowait)
533
534 self.assertRaises(pyqueue.Full, put, 7, False)
535 self.assertTimingAlmostEqual(put.elapsed, 0)
536
537 self.assertRaises(pyqueue.Full, put, 7, False, None)
538 self.assertTimingAlmostEqual(put.elapsed, 0)
539
540 self.assertRaises(pyqueue.Full, put_nowait, 7)
541 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
542
543 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
544 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
545
546 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
547 self.assertTimingAlmostEqual(put.elapsed, 0)
548
549 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
550 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 self.assertEqual(queue_empty(queue), True)
556 self.assertEqual(queue_full(queue, MAXSIZE), False)
557
558 proc.join()
559
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000560 @classmethod
561 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000563 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000564 queue.put(2)
565 queue.put(3)
566 queue.put(4)
567 queue.put(5)
568 parent_can_continue.set()
569
570 def test_get(self):
571 queue = self.Queue()
572 child_can_start = self.Event()
573 parent_can_continue = self.Event()
574
575 proc = self.Process(
576 target=self._test_get,
577 args=(queue, child_can_start, parent_can_continue)
578 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000579 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 proc.start()
581
582 self.assertEqual(queue_empty(queue), True)
583
584 child_can_start.set()
585 parent_can_continue.wait()
586
587 time.sleep(DELTA)
588 self.assertEqual(queue_empty(queue), False)
589
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000590 # Hangs unexpectedly, remove for now
591 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 self.assertEqual(queue.get(True, None), 2)
593 self.assertEqual(queue.get(True), 3)
594 self.assertEqual(queue.get(timeout=1), 4)
595 self.assertEqual(queue.get_nowait(), 5)
596
597 self.assertEqual(queue_empty(queue), True)
598
599 get = TimingWrapper(queue.get)
600 get_nowait = TimingWrapper(queue.get_nowait)
601
602 self.assertRaises(pyqueue.Empty, get, False)
603 self.assertTimingAlmostEqual(get.elapsed, 0)
604
605 self.assertRaises(pyqueue.Empty, get, False, None)
606 self.assertTimingAlmostEqual(get.elapsed, 0)
607
608 self.assertRaises(pyqueue.Empty, get_nowait)
609 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
610
611 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
612 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
613
614 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
615 self.assertTimingAlmostEqual(get.elapsed, 0)
616
617 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
618 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
619
620 proc.join()
621
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000622 @classmethod
623 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000624 for i in range(10, 20):
625 queue.put(i)
626 # note that at this point the items may only be buffered, so the
627 # process cannot shutdown until the feeder thread has finished
628 # pushing items onto the pipe.
629
630 def test_fork(self):
631 # Old versions of Queue would fail to create a new feeder
632 # thread for a forked process if the original process had its
633 # own feeder thread. This test checks that this no longer
634 # happens.
635
636 queue = self.Queue()
637
638 # put items on queue so that main process starts a feeder thread
639 for i in range(10):
640 queue.put(i)
641
642 # wait to make sure thread starts before we fork a new process
643 time.sleep(DELTA)
644
645 # fork process
646 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200647 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000648 p.start()
649
650 # check that all expected items are in the queue
651 for i in range(20):
652 self.assertEqual(queue.get(), i)
653 self.assertRaises(pyqueue.Empty, queue.get, False)
654
655 p.join()
656
657 def test_qsize(self):
658 q = self.Queue()
659 try:
660 self.assertEqual(q.qsize(), 0)
661 except NotImplementedError:
662 return
663 q.put(1)
664 self.assertEqual(q.qsize(), 1)
665 q.put(5)
666 self.assertEqual(q.qsize(), 2)
667 q.get()
668 self.assertEqual(q.qsize(), 1)
669 q.get()
670 self.assertEqual(q.qsize(), 0)
671
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000672 @classmethod
673 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 for obj in iter(q.get, None):
675 time.sleep(DELTA)
676 q.task_done()
677
678 def test_task_done(self):
679 queue = self.JoinableQueue()
680
681 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000682 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683
684 workers = [self.Process(target=self._test_task_done, args=(queue,))
685 for i in range(4)]
686
687 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200688 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 p.start()
690
691 for i in range(10):
692 queue.put(i)
693
694 queue.join()
695
696 for p in workers:
697 queue.put(None)
698
699 for p in workers:
700 p.join()
701
702#
703#
704#
705
706class _TestLock(BaseTestCase):
707
708 def test_lock(self):
709 lock = self.Lock()
710 self.assertEqual(lock.acquire(), True)
711 self.assertEqual(lock.acquire(False), False)
712 self.assertEqual(lock.release(), None)
713 self.assertRaises((ValueError, threading.ThreadError), lock.release)
714
715 def test_rlock(self):
716 lock = self.RLock()
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.acquire(), True)
719 self.assertEqual(lock.acquire(), True)
720 self.assertEqual(lock.release(), None)
721 self.assertEqual(lock.release(), None)
722 self.assertEqual(lock.release(), None)
723 self.assertRaises((AssertionError, RuntimeError), lock.release)
724
Jesse Nollerf8d00852009-03-31 03:25:07 +0000725 def test_lock_context(self):
726 with self.Lock():
727 pass
728
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729
730class _TestSemaphore(BaseTestCase):
731
732 def _test_semaphore(self, sem):
733 self.assertReturnsIfImplemented(2, get_value, sem)
734 self.assertEqual(sem.acquire(), True)
735 self.assertReturnsIfImplemented(1, get_value, sem)
736 self.assertEqual(sem.acquire(), True)
737 self.assertReturnsIfImplemented(0, get_value, sem)
738 self.assertEqual(sem.acquire(False), False)
739 self.assertReturnsIfImplemented(0, get_value, sem)
740 self.assertEqual(sem.release(), None)
741 self.assertReturnsIfImplemented(1, get_value, sem)
742 self.assertEqual(sem.release(), None)
743 self.assertReturnsIfImplemented(2, get_value, sem)
744
745 def test_semaphore(self):
746 sem = self.Semaphore(2)
747 self._test_semaphore(sem)
748 self.assertEqual(sem.release(), None)
749 self.assertReturnsIfImplemented(3, get_value, sem)
750 self.assertEqual(sem.release(), None)
751 self.assertReturnsIfImplemented(4, get_value, sem)
752
753 def test_bounded_semaphore(self):
754 sem = self.BoundedSemaphore(2)
755 self._test_semaphore(sem)
756 # Currently fails on OS/X
757 #if HAVE_GETVALUE:
758 # self.assertRaises(ValueError, sem.release)
759 # self.assertReturnsIfImplemented(2, get_value, sem)
760
761 def test_timeout(self):
762 if self.TYPE != 'processes':
763 return
764
765 sem = self.Semaphore(0)
766 acquire = TimingWrapper(sem.acquire)
767
768 self.assertEqual(acquire(False), False)
769 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
770
771 self.assertEqual(acquire(False, None), False)
772 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
773
774 self.assertEqual(acquire(False, TIMEOUT1), False)
775 self.assertTimingAlmostEqual(acquire.elapsed, 0)
776
777 self.assertEqual(acquire(True, TIMEOUT2), False)
778 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
779
780 self.assertEqual(acquire(timeout=TIMEOUT3), False)
781 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
782
783
784class _TestCondition(BaseTestCase):
785
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000786 @classmethod
787 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 cond.acquire()
789 sleeping.release()
790 cond.wait(timeout)
791 woken.release()
792 cond.release()
793
794 def check_invariant(self, cond):
795 # this is only supposed to succeed when there are no sleepers
796 if self.TYPE == 'processes':
797 try:
798 sleepers = (cond._sleeping_count.get_value() -
799 cond._woken_count.get_value())
800 self.assertEqual(sleepers, 0)
801 self.assertEqual(cond._wait_semaphore.get_value(), 0)
802 except NotImplementedError:
803 pass
804
805 def test_notify(self):
806 cond = self.Condition()
807 sleeping = self.Semaphore(0)
808 woken = self.Semaphore(0)
809
810 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000811 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 p.start()
813
814 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000815 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 p.start()
817
818 # wait for both children to start sleeping
819 sleeping.acquire()
820 sleeping.acquire()
821
822 # check no process/thread has woken up
823 time.sleep(DELTA)
824 self.assertReturnsIfImplemented(0, get_value, woken)
825
826 # wake up one process/thread
827 cond.acquire()
828 cond.notify()
829 cond.release()
830
831 # check one process/thread has woken up
832 time.sleep(DELTA)
833 self.assertReturnsIfImplemented(1, get_value, woken)
834
835 # wake up another
836 cond.acquire()
837 cond.notify()
838 cond.release()
839
840 # check other has woken up
841 time.sleep(DELTA)
842 self.assertReturnsIfImplemented(2, get_value, woken)
843
844 # check state is not mucked up
845 self.check_invariant(cond)
846 p.join()
847
848 def test_notify_all(self):
849 cond = self.Condition()
850 sleeping = self.Semaphore(0)
851 woken = self.Semaphore(0)
852
853 # start some threads/processes which will timeout
854 for i in range(3):
855 p = self.Process(target=self.f,
856 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000857 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 p.start()
859
860 t = threading.Thread(target=self.f,
861 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000862 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 t.start()
864
865 # wait for them all to sleep
866 for i in range(6):
867 sleeping.acquire()
868
869 # check they have all timed out
870 for i in range(6):
871 woken.acquire()
872 self.assertReturnsIfImplemented(0, get_value, woken)
873
874 # check state is not mucked up
875 self.check_invariant(cond)
876
877 # start some more threads/processes
878 for i in range(3):
879 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000880 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 p.start()
882
883 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000884 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885 t.start()
886
887 # wait for them to all sleep
888 for i in range(6):
889 sleeping.acquire()
890
891 # check no process/thread has woken up
892 time.sleep(DELTA)
893 self.assertReturnsIfImplemented(0, get_value, woken)
894
895 # wake them all up
896 cond.acquire()
897 cond.notify_all()
898 cond.release()
899
900 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200901 for i in range(10):
902 try:
903 if get_value(woken) == 6:
904 break
905 except NotImplementedError:
906 break
907 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 self.assertReturnsIfImplemented(6, get_value, woken)
909
910 # check state is not mucked up
911 self.check_invariant(cond)
912
913 def test_timeout(self):
914 cond = self.Condition()
915 wait = TimingWrapper(cond.wait)
916 cond.acquire()
917 res = wait(TIMEOUT1)
918 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000919 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
921
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200922 @classmethod
923 def _test_waitfor_f(cls, cond, state):
924 with cond:
925 state.value = 0
926 cond.notify()
927 result = cond.wait_for(lambda : state.value==4)
928 if not result or state.value != 4:
929 sys.exit(1)
930
931 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
932 def test_waitfor(self):
933 # based on test in test/lock_tests.py
934 cond = self.Condition()
935 state = self.Value('i', -1)
936
937 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
938 p.daemon = True
939 p.start()
940
941 with cond:
942 result = cond.wait_for(lambda : state.value==0)
943 self.assertTrue(result)
944 self.assertEqual(state.value, 0)
945
946 for i in range(4):
947 time.sleep(0.01)
948 with cond:
949 state.value += 1
950 cond.notify()
951
952 p.join(5)
953 self.assertFalse(p.is_alive())
954 self.assertEqual(p.exitcode, 0)
955
956 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100957 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
958 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200959 with cond:
960 expected = 0.1
961 dt = time.time()
962 result = cond.wait_for(lambda : state.value==4, timeout=expected)
963 dt = time.time() - dt
964 # borrow logic in assertTimeout() from test/lock_tests.py
965 if not result and expected * 0.6 < dt < expected * 10.0:
966 success.value = True
967
968 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
969 def test_waitfor_timeout(self):
970 # based on test in test/lock_tests.py
971 cond = self.Condition()
972 state = self.Value('i', 0)
973 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100974 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975
976 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100977 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200978 p.daemon = True
979 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100980 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200981
982 # Only increment 3 times, so state == 4 is never reached.
983 for i in range(3):
984 time.sleep(0.01)
985 with cond:
986 state.value += 1
987 cond.notify()
988
989 p.join(5)
990 self.assertTrue(success.value)
991
Richard Oudkerk98449932012-06-05 13:15:29 +0100992 @classmethod
993 def _test_wait_result(cls, c, pid):
994 with c:
995 c.notify()
996 time.sleep(1)
997 if pid is not None:
998 os.kill(pid, signal.SIGINT)
999
1000 def test_wait_result(self):
1001 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1002 pid = os.getpid()
1003 else:
1004 pid = None
1005
1006 c = self.Condition()
1007 with c:
1008 self.assertFalse(c.wait(0))
1009 self.assertFalse(c.wait(0.1))
1010
1011 p = self.Process(target=self._test_wait_result, args=(c, pid))
1012 p.start()
1013
1014 self.assertTrue(c.wait(10))
1015 if pid is not None:
1016 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1017
1018 p.join()
1019
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020
1021class _TestEvent(BaseTestCase):
1022
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001023 @classmethod
1024 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025 time.sleep(TIMEOUT2)
1026 event.set()
1027
1028 def test_event(self):
1029 event = self.Event()
1030 wait = TimingWrapper(event.wait)
1031
Ezio Melotti13925002011-03-16 11:05:33 +02001032 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001034 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035
Benjamin Peterson965ce872009-04-05 21:24:58 +00001036 # Removed, threading.Event.wait() will return the value of the __flag
1037 # instead of None. API Shear with the semaphore backed mp.Event
1038 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001040 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1042
1043 event.set()
1044
1045 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001046 self.assertEqual(event.is_set(), True)
1047 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001049 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1051 # self.assertEqual(event.is_set(), True)
1052
1053 event.clear()
1054
1055 #self.assertEqual(event.is_set(), False)
1056
Jesus Cea94f964f2011-09-09 20:26:57 +02001057 p = self.Process(target=self._test_event, args=(event,))
1058 p.daemon = True
1059 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001060 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001061
1062#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001063# Tests for Barrier - adapted from tests in test/lock_tests.py
1064#
1065
1066# Many of the tests for threading.Barrier use a list as an atomic
1067# counter: a value is appended to increment the counter, and the
1068# length of the list gives the value. We use the class DummyList
1069# for the same purpose.
1070
1071class _DummyList(object):
1072
1073 def __init__(self):
1074 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1075 lock = multiprocessing.Lock()
1076 self.__setstate__((wrapper, lock))
1077 self._lengthbuf[0] = 0
1078
1079 def __setstate__(self, state):
1080 (self._wrapper, self._lock) = state
1081 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1082
1083 def __getstate__(self):
1084 return (self._wrapper, self._lock)
1085
1086 def append(self, _):
1087 with self._lock:
1088 self._lengthbuf[0] += 1
1089
1090 def __len__(self):
1091 with self._lock:
1092 return self._lengthbuf[0]
1093
1094def _wait():
1095 # A crude wait/yield function not relying on synchronization primitives.
1096 time.sleep(0.01)
1097
1098
1099class Bunch(object):
1100 """
1101 A bunch of threads.
1102 """
1103 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1104 """
1105 Construct a bunch of `n` threads running the same function `f`.
1106 If `wait_before_exit` is True, the threads won't terminate until
1107 do_finish() is called.
1108 """
1109 self.f = f
1110 self.args = args
1111 self.n = n
1112 self.started = namespace.DummyList()
1113 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001114 self._can_exit = namespace.Event()
1115 if not wait_before_exit:
1116 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001117 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001118 p = namespace.Process(target=self.task)
1119 p.daemon = True
1120 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001121
1122 def task(self):
1123 pid = os.getpid()
1124 self.started.append(pid)
1125 try:
1126 self.f(*self.args)
1127 finally:
1128 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001129 self._can_exit.wait(30)
1130 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001131
1132 def wait_for_started(self):
1133 while len(self.started) < self.n:
1134 _wait()
1135
1136 def wait_for_finished(self):
1137 while len(self.finished) < self.n:
1138 _wait()
1139
1140 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001141 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001142
1143
1144class AppendTrue(object):
1145 def __init__(self, obj):
1146 self.obj = obj
1147 def __call__(self):
1148 self.obj.append(True)
1149
1150
1151class _TestBarrier(BaseTestCase):
1152 """
1153 Tests for Barrier objects.
1154 """
1155 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001156 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001157
1158 def setUp(self):
1159 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1160
1161 def tearDown(self):
1162 self.barrier.abort()
1163 self.barrier = None
1164
1165 def DummyList(self):
1166 if self.TYPE == 'threads':
1167 return []
1168 elif self.TYPE == 'manager':
1169 return self.manager.list()
1170 else:
1171 return _DummyList()
1172
1173 def run_threads(self, f, args):
1174 b = Bunch(self, f, args, self.N-1)
1175 f(*args)
1176 b.wait_for_finished()
1177
1178 @classmethod
1179 def multipass(cls, barrier, results, n):
1180 m = barrier.parties
1181 assert m == cls.N
1182 for i in range(n):
1183 results[0].append(True)
1184 assert len(results[1]) == i * m
1185 barrier.wait()
1186 results[1].append(True)
1187 assert len(results[0]) == (i + 1) * m
1188 barrier.wait()
1189 try:
1190 assert barrier.n_waiting == 0
1191 except NotImplementedError:
1192 pass
1193 assert not barrier.broken
1194
1195 def test_barrier(self, passes=1):
1196 """
1197 Test that a barrier is passed in lockstep
1198 """
1199 results = [self.DummyList(), self.DummyList()]
1200 self.run_threads(self.multipass, (self.barrier, results, passes))
1201
1202 def test_barrier_10(self):
1203 """
1204 Test that a barrier works for 10 consecutive runs
1205 """
1206 return self.test_barrier(10)
1207
1208 @classmethod
1209 def _test_wait_return_f(cls, barrier, queue):
1210 res = barrier.wait()
1211 queue.put(res)
1212
1213 def test_wait_return(self):
1214 """
1215 test the return value from barrier.wait
1216 """
1217 queue = self.Queue()
1218 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1219 results = [queue.get() for i in range(self.N)]
1220 self.assertEqual(results.count(0), 1)
1221
1222 @classmethod
1223 def _test_action_f(cls, barrier, results):
1224 barrier.wait()
1225 if len(results) != 1:
1226 raise RuntimeError
1227
1228 def test_action(self):
1229 """
1230 Test the 'action' callback
1231 """
1232 results = self.DummyList()
1233 barrier = self.Barrier(self.N, action=AppendTrue(results))
1234 self.run_threads(self._test_action_f, (barrier, results))
1235 self.assertEqual(len(results), 1)
1236
1237 @classmethod
1238 def _test_abort_f(cls, barrier, results1, results2):
1239 try:
1240 i = barrier.wait()
1241 if i == cls.N//2:
1242 raise RuntimeError
1243 barrier.wait()
1244 results1.append(True)
1245 except threading.BrokenBarrierError:
1246 results2.append(True)
1247 except RuntimeError:
1248 barrier.abort()
1249
1250 def test_abort(self):
1251 """
1252 Test that an abort will put the barrier in a broken state
1253 """
1254 results1 = self.DummyList()
1255 results2 = self.DummyList()
1256 self.run_threads(self._test_abort_f,
1257 (self.barrier, results1, results2))
1258 self.assertEqual(len(results1), 0)
1259 self.assertEqual(len(results2), self.N-1)
1260 self.assertTrue(self.barrier.broken)
1261
1262 @classmethod
1263 def _test_reset_f(cls, barrier, results1, results2, results3):
1264 i = barrier.wait()
1265 if i == cls.N//2:
1266 # Wait until the other threads are all in the barrier.
1267 while barrier.n_waiting < cls.N-1:
1268 time.sleep(0.001)
1269 barrier.reset()
1270 else:
1271 try:
1272 barrier.wait()
1273 results1.append(True)
1274 except threading.BrokenBarrierError:
1275 results2.append(True)
1276 # Now, pass the barrier again
1277 barrier.wait()
1278 results3.append(True)
1279
1280 def test_reset(self):
1281 """
1282 Test that a 'reset' on a barrier frees the waiting threads
1283 """
1284 results1 = self.DummyList()
1285 results2 = self.DummyList()
1286 results3 = self.DummyList()
1287 self.run_threads(self._test_reset_f,
1288 (self.barrier, results1, results2, results3))
1289 self.assertEqual(len(results1), 0)
1290 self.assertEqual(len(results2), self.N-1)
1291 self.assertEqual(len(results3), self.N)
1292
1293 @classmethod
1294 def _test_abort_and_reset_f(cls, barrier, barrier2,
1295 results1, results2, results3):
1296 try:
1297 i = barrier.wait()
1298 if i == cls.N//2:
1299 raise RuntimeError
1300 barrier.wait()
1301 results1.append(True)
1302 except threading.BrokenBarrierError:
1303 results2.append(True)
1304 except RuntimeError:
1305 barrier.abort()
1306 # Synchronize and reset the barrier. Must synchronize first so
1307 # that everyone has left it when we reset, and after so that no
1308 # one enters it before the reset.
1309 if barrier2.wait() == cls.N//2:
1310 barrier.reset()
1311 barrier2.wait()
1312 barrier.wait()
1313 results3.append(True)
1314
1315 def test_abort_and_reset(self):
1316 """
1317 Test that a barrier can be reset after being broken.
1318 """
1319 results1 = self.DummyList()
1320 results2 = self.DummyList()
1321 results3 = self.DummyList()
1322 barrier2 = self.Barrier(self.N)
1323
1324 self.run_threads(self._test_abort_and_reset_f,
1325 (self.barrier, barrier2, results1, results2, results3))
1326 self.assertEqual(len(results1), 0)
1327 self.assertEqual(len(results2), self.N-1)
1328 self.assertEqual(len(results3), self.N)
1329
1330 @classmethod
1331 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001332 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001333 if i == cls.N//2:
1334 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001335 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001336 try:
1337 barrier.wait(0.5)
1338 except threading.BrokenBarrierError:
1339 results.append(True)
1340
1341 def test_timeout(self):
1342 """
1343 Test wait(timeout)
1344 """
1345 results = self.DummyList()
1346 self.run_threads(self._test_timeout_f, (self.barrier, results))
1347 self.assertEqual(len(results), self.barrier.parties)
1348
1349 @classmethod
1350 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001351 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001352 if i == cls.N//2:
1353 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001354 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001355 try:
1356 barrier.wait()
1357 except threading.BrokenBarrierError:
1358 results.append(True)
1359
1360 def test_default_timeout(self):
1361 """
1362 Test the barrier's default timeout
1363 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001364 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001365 results = self.DummyList()
1366 self.run_threads(self._test_default_timeout_f, (barrier, results))
1367 self.assertEqual(len(results), barrier.parties)
1368
1369 def test_single_thread(self):
1370 b = self.Barrier(1)
1371 b.wait()
1372 b.wait()
1373
1374 @classmethod
1375 def _test_thousand_f(cls, barrier, passes, conn, lock):
1376 for i in range(passes):
1377 barrier.wait()
1378 with lock:
1379 conn.send(i)
1380
1381 def test_thousand(self):
1382 if self.TYPE == 'manager':
1383 return
1384 passes = 1000
1385 lock = self.Lock()
1386 conn, child_conn = self.Pipe(False)
1387 for j in range(self.N):
1388 p = self.Process(target=self._test_thousand_f,
1389 args=(self.barrier, passes, child_conn, lock))
1390 p.start()
1391
1392 for i in range(passes):
1393 for j in range(self.N):
1394 self.assertEqual(conn.recv(), i)
1395
1396#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397#
1398#
1399
1400class _TestValue(BaseTestCase):
1401
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001402 ALLOWED_TYPES = ('processes',)
1403
Benjamin Petersone711caf2008-06-11 16:44:04 +00001404 codes_values = [
1405 ('i', 4343, 24234),
1406 ('d', 3.625, -4.25),
1407 ('h', -232, 234),
1408 ('c', latin('x'), latin('y'))
1409 ]
1410
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001411 def setUp(self):
1412 if not HAS_SHAREDCTYPES:
1413 self.skipTest("requires multiprocessing.sharedctypes")
1414
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001415 @classmethod
1416 def _test(cls, values):
1417 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001418 sv.value = cv[2]
1419
1420
1421 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001422 if raw:
1423 values = [self.RawValue(code, value)
1424 for code, value, _ in self.codes_values]
1425 else:
1426 values = [self.Value(code, value)
1427 for code, value, _ in self.codes_values]
1428
1429 for sv, cv in zip(values, self.codes_values):
1430 self.assertEqual(sv.value, cv[1])
1431
1432 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001433 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001434 proc.start()
1435 proc.join()
1436
1437 for sv, cv in zip(values, self.codes_values):
1438 self.assertEqual(sv.value, cv[2])
1439
1440 def test_rawvalue(self):
1441 self.test_value(raw=True)
1442
1443 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001444 val1 = self.Value('i', 5)
1445 lock1 = val1.get_lock()
1446 obj1 = val1.get_obj()
1447
1448 val2 = self.Value('i', 5, lock=None)
1449 lock2 = val2.get_lock()
1450 obj2 = val2.get_obj()
1451
1452 lock = self.Lock()
1453 val3 = self.Value('i', 5, lock=lock)
1454 lock3 = val3.get_lock()
1455 obj3 = val3.get_obj()
1456 self.assertEqual(lock, lock3)
1457
Jesse Nollerb0516a62009-01-18 03:11:38 +00001458 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001459 self.assertFalse(hasattr(arr4, 'get_lock'))
1460 self.assertFalse(hasattr(arr4, 'get_obj'))
1461
Jesse Nollerb0516a62009-01-18 03:11:38 +00001462 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1463
1464 arr5 = self.RawValue('i', 5)
1465 self.assertFalse(hasattr(arr5, 'get_lock'))
1466 self.assertFalse(hasattr(arr5, 'get_obj'))
1467
Benjamin Petersone711caf2008-06-11 16:44:04 +00001468
1469class _TestArray(BaseTestCase):
1470
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001471 ALLOWED_TYPES = ('processes',)
1472
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001473 @classmethod
1474 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475 for i in range(1, len(seq)):
1476 seq[i] += seq[i-1]
1477
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001478 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001479 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001480 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1481 if raw:
1482 arr = self.RawArray('i', seq)
1483 else:
1484 arr = self.Array('i', seq)
1485
1486 self.assertEqual(len(arr), len(seq))
1487 self.assertEqual(arr[3], seq[3])
1488 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1489
1490 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1491
1492 self.assertEqual(list(arr[:]), seq)
1493
1494 self.f(seq)
1495
1496 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001497 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001498 p.start()
1499 p.join()
1500
1501 self.assertEqual(list(arr[:]), seq)
1502
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001503 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001504 def test_array_from_size(self):
1505 size = 10
1506 # Test for zeroing (see issue #11675).
1507 # The repetition below strengthens the test by increasing the chances
1508 # of previously allocated non-zero memory being used for the new array
1509 # on the 2nd and 3rd loops.
1510 for _ in range(3):
1511 arr = self.Array('i', size)
1512 self.assertEqual(len(arr), size)
1513 self.assertEqual(list(arr), [0] * size)
1514 arr[:] = range(10)
1515 self.assertEqual(list(arr), list(range(10)))
1516 del arr
1517
1518 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 def test_rawarray(self):
1520 self.test_array(raw=True)
1521
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001522 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001523 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001524 arr1 = self.Array('i', list(range(10)))
1525 lock1 = arr1.get_lock()
1526 obj1 = arr1.get_obj()
1527
1528 arr2 = self.Array('i', list(range(10)), lock=None)
1529 lock2 = arr2.get_lock()
1530 obj2 = arr2.get_obj()
1531
1532 lock = self.Lock()
1533 arr3 = self.Array('i', list(range(10)), lock=lock)
1534 lock3 = arr3.get_lock()
1535 obj3 = arr3.get_obj()
1536 self.assertEqual(lock, lock3)
1537
Jesse Nollerb0516a62009-01-18 03:11:38 +00001538 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001539 self.assertFalse(hasattr(arr4, 'get_lock'))
1540 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001541 self.assertRaises(AttributeError,
1542 self.Array, 'i', range(10), lock='notalock')
1543
1544 arr5 = self.RawArray('i', range(10))
1545 self.assertFalse(hasattr(arr5, 'get_lock'))
1546 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001547
1548#
1549#
1550#
1551
1552class _TestContainers(BaseTestCase):
1553
1554 ALLOWED_TYPES = ('manager',)
1555
1556 def test_list(self):
1557 a = self.list(list(range(10)))
1558 self.assertEqual(a[:], list(range(10)))
1559
1560 b = self.list()
1561 self.assertEqual(b[:], [])
1562
1563 b.extend(list(range(5)))
1564 self.assertEqual(b[:], list(range(5)))
1565
1566 self.assertEqual(b[2], 2)
1567 self.assertEqual(b[2:10], [2,3,4])
1568
1569 b *= 2
1570 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1571
1572 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1573
1574 self.assertEqual(a[:], list(range(10)))
1575
1576 d = [a, b]
1577 e = self.list(d)
1578 self.assertEqual(
1579 e[:],
1580 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1581 )
1582
1583 f = self.list([a])
1584 a.append('hello')
1585 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1586
1587 def test_dict(self):
1588 d = self.dict()
1589 indices = list(range(65, 70))
1590 for i in indices:
1591 d[i] = chr(i)
1592 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1593 self.assertEqual(sorted(d.keys()), indices)
1594 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1595 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1596
1597 def test_namespace(self):
1598 n = self.Namespace()
1599 n.name = 'Bob'
1600 n.job = 'Builder'
1601 n._hidden = 'hidden'
1602 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1603 del n.job
1604 self.assertEqual(str(n), "Namespace(name='Bob')")
1605 self.assertTrue(hasattr(n, 'name'))
1606 self.assertTrue(not hasattr(n, 'job'))
1607
1608#
1609#
1610#
1611
1612def sqr(x, wait=0.0):
1613 time.sleep(wait)
1614 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001615
Antoine Pitroude911b22011-12-21 11:03:24 +01001616def mul(x, y):
1617 return x*y
1618
Benjamin Petersone711caf2008-06-11 16:44:04 +00001619class _TestPool(BaseTestCase):
1620
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001621 @classmethod
1622 def setUpClass(cls):
1623 super().setUpClass()
1624 cls.pool = cls.Pool(4)
1625
1626 @classmethod
1627 def tearDownClass(cls):
1628 cls.pool.terminate()
1629 cls.pool.join()
1630 cls.pool = None
1631 super().tearDownClass()
1632
Benjamin Petersone711caf2008-06-11 16:44:04 +00001633 def test_apply(self):
1634 papply = self.pool.apply
1635 self.assertEqual(papply(sqr, (5,)), sqr(5))
1636 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1637
1638 def test_map(self):
1639 pmap = self.pool.map
1640 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1641 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1642 list(map(sqr, list(range(100)))))
1643
Antoine Pitroude911b22011-12-21 11:03:24 +01001644 def test_starmap(self):
1645 psmap = self.pool.starmap
1646 tuples = list(zip(range(10), range(9,-1, -1)))
1647 self.assertEqual(psmap(mul, tuples),
1648 list(itertools.starmap(mul, tuples)))
1649 tuples = list(zip(range(100), range(99,-1, -1)))
1650 self.assertEqual(psmap(mul, tuples, chunksize=20),
1651 list(itertools.starmap(mul, tuples)))
1652
1653 def test_starmap_async(self):
1654 tuples = list(zip(range(100), range(99,-1, -1)))
1655 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1656 list(itertools.starmap(mul, tuples)))
1657
Hynek Schlawack254af262012-10-27 12:53:02 +02001658 def test_map_async(self):
1659 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1660 list(map(sqr, list(range(10)))))
1661
1662 def test_map_async_callbacks(self):
1663 call_args = self.manager.list() if self.TYPE == 'manager' else []
1664 self.pool.map_async(int, ['1'],
1665 callback=call_args.append,
1666 error_callback=call_args.append).wait()
1667 self.assertEqual(1, len(call_args))
1668 self.assertEqual([1], call_args[0])
1669 self.pool.map_async(int, ['a'],
1670 callback=call_args.append,
1671 error_callback=call_args.append).wait()
1672 self.assertEqual(2, len(call_args))
1673 self.assertIsInstance(call_args[1], ValueError)
1674
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001675 def test_map_chunksize(self):
1676 try:
1677 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1678 except multiprocessing.TimeoutError:
1679 self.fail("pool.map_async with chunksize stalled on null list")
1680
Benjamin Petersone711caf2008-06-11 16:44:04 +00001681 def test_async(self):
1682 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1683 get = TimingWrapper(res.get)
1684 self.assertEqual(get(), 49)
1685 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1686
1687 def test_async_timeout(self):
1688 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1689 get = TimingWrapper(res.get)
1690 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1691 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1692
1693 def test_imap(self):
1694 it = self.pool.imap(sqr, list(range(10)))
1695 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1696
1697 it = self.pool.imap(sqr, list(range(10)))
1698 for i in range(10):
1699 self.assertEqual(next(it), i*i)
1700 self.assertRaises(StopIteration, it.__next__)
1701
1702 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1703 for i in range(1000):
1704 self.assertEqual(next(it), i*i)
1705 self.assertRaises(StopIteration, it.__next__)
1706
1707 def test_imap_unordered(self):
1708 it = self.pool.imap_unordered(sqr, list(range(1000)))
1709 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1710
1711 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1712 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1713
1714 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001715 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1716 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1717
Benjamin Petersone711caf2008-06-11 16:44:04 +00001718 p = multiprocessing.Pool(3)
1719 self.assertEqual(3, len(p._pool))
1720 p.close()
1721 p.join()
1722
1723 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001724 result = self.pool.map_async(
1725 time.sleep, [0.1 for i in range(10000)], chunksize=1
1726 )
1727 self.pool.terminate()
1728 join = TimingWrapper(self.pool.join)
1729 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001730 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001731
Richard Oudkerke41682b2012-06-06 19:04:57 +01001732 def test_empty_iterable(self):
1733 # See Issue 12157
1734 p = self.Pool(1)
1735
1736 self.assertEqual(p.map(sqr, []), [])
1737 self.assertEqual(list(p.imap(sqr, [])), [])
1738 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1739 self.assertEqual(p.map_async(sqr, []).get(), [])
1740
1741 p.close()
1742 p.join()
1743
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001744 def test_context(self):
1745 if self.TYPE == 'processes':
1746 L = list(range(10))
1747 expected = [sqr(i) for i in L]
1748 with multiprocessing.Pool(2) as p:
1749 r = p.map_async(sqr, L)
1750 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001751 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001752
Ask Solem2afcbf22010-11-09 20:55:52 +00001753def raising():
1754 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001755
Ask Solem2afcbf22010-11-09 20:55:52 +00001756def unpickleable_result():
1757 return lambda: 42
1758
1759class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001760 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001761
1762 def test_async_error_callback(self):
1763 p = multiprocessing.Pool(2)
1764
1765 scratchpad = [None]
1766 def errback(exc):
1767 scratchpad[0] = exc
1768
1769 res = p.apply_async(raising, error_callback=errback)
1770 self.assertRaises(KeyError, res.get)
1771 self.assertTrue(scratchpad[0])
1772 self.assertIsInstance(scratchpad[0], KeyError)
1773
1774 p.close()
1775 p.join()
1776
1777 def test_unpickleable_result(self):
1778 from multiprocessing.pool import MaybeEncodingError
1779 p = multiprocessing.Pool(2)
1780
1781 # Make sure we don't lose pool processes because of encoding errors.
1782 for iteration in range(20):
1783
1784 scratchpad = [None]
1785 def errback(exc):
1786 scratchpad[0] = exc
1787
1788 res = p.apply_async(unpickleable_result, error_callback=errback)
1789 self.assertRaises(MaybeEncodingError, res.get)
1790 wrapped = scratchpad[0]
1791 self.assertTrue(wrapped)
1792 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1793 self.assertIsNotNone(wrapped.exc)
1794 self.assertIsNotNone(wrapped.value)
1795
1796 p.close()
1797 p.join()
1798
1799class _TestPoolWorkerLifetime(BaseTestCase):
1800 ALLOWED_TYPES = ('processes', )
1801
Jesse Noller1f0b6582010-01-27 03:36:01 +00001802 def test_pool_worker_lifetime(self):
1803 p = multiprocessing.Pool(3, maxtasksperchild=10)
1804 self.assertEqual(3, len(p._pool))
1805 origworkerpids = [w.pid for w in p._pool]
1806 # Run many tasks so each worker gets replaced (hopefully)
1807 results = []
1808 for i in range(100):
1809 results.append(p.apply_async(sqr, (i, )))
1810 # Fetch the results and verify we got the right answers,
1811 # also ensuring all the tasks have completed.
1812 for (j, res) in enumerate(results):
1813 self.assertEqual(res.get(), sqr(j))
1814 # Refill the pool
1815 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001816 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001817 # (countdown * DELTA = 5 seconds max startup process time)
1818 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001819 while countdown and not all(w.is_alive() for w in p._pool):
1820 countdown -= 1
1821 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001822 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001823 # All pids should be assigned. See issue #7805.
1824 self.assertNotIn(None, origworkerpids)
1825 self.assertNotIn(None, finalworkerpids)
1826 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001827 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1828 p.close()
1829 p.join()
1830
Charles-François Natalif8859e12011-10-24 18:45:29 +02001831 def test_pool_worker_lifetime_early_close(self):
1832 # Issue #10332: closing a pool whose workers have limited lifetimes
1833 # before all the tasks completed would make join() hang.
1834 p = multiprocessing.Pool(3, maxtasksperchild=1)
1835 results = []
1836 for i in range(6):
1837 results.append(p.apply_async(sqr, (i, 0.3)))
1838 p.close()
1839 p.join()
1840 # check the results
1841 for (j, res) in enumerate(results):
1842 self.assertEqual(res.get(), sqr(j))
1843
Benjamin Petersone711caf2008-06-11 16:44:04 +00001844#
1845# Test of creating a customized manager class
1846#
1847
1848from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1849
1850class FooBar(object):
1851 def f(self):
1852 return 'f()'
1853 def g(self):
1854 raise ValueError
1855 def _h(self):
1856 return '_h()'
1857
1858def baz():
1859 for i in range(10):
1860 yield i*i
1861
1862class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001863 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001864 def __iter__(self):
1865 return self
1866 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001867 return self._callmethod('__next__')
1868
1869class MyManager(BaseManager):
1870 pass
1871
1872MyManager.register('Foo', callable=FooBar)
1873MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1874MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1875
1876
1877class _TestMyManager(BaseTestCase):
1878
1879 ALLOWED_TYPES = ('manager',)
1880
1881 def test_mymanager(self):
1882 manager = MyManager()
1883 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001884 self.common(manager)
1885 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001886
Richard Oudkerkac385712012-06-18 21:29:30 +01001887 # If the manager process exited cleanly then the exitcode
1888 # will be zero. Otherwise (after a short timeout)
1889 # terminate() is used, resulting in an exitcode of -SIGTERM.
1890 self.assertEqual(manager._process.exitcode, 0)
1891
1892 def test_mymanager_context(self):
1893 with MyManager() as manager:
1894 self.common(manager)
1895 self.assertEqual(manager._process.exitcode, 0)
1896
1897 def test_mymanager_context_prestarted(self):
1898 manager = MyManager()
1899 manager.start()
1900 with manager:
1901 self.common(manager)
1902 self.assertEqual(manager._process.exitcode, 0)
1903
1904 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001905 foo = manager.Foo()
1906 bar = manager.Bar()
1907 baz = manager.baz()
1908
1909 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1910 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1911
1912 self.assertEqual(foo_methods, ['f', 'g'])
1913 self.assertEqual(bar_methods, ['f', '_h'])
1914
1915 self.assertEqual(foo.f(), 'f()')
1916 self.assertRaises(ValueError, foo.g)
1917 self.assertEqual(foo._callmethod('f'), 'f()')
1918 self.assertRaises(RemoteError, foo._callmethod, '_h')
1919
1920 self.assertEqual(bar.f(), 'f()')
1921 self.assertEqual(bar._h(), '_h()')
1922 self.assertEqual(bar._callmethod('f'), 'f()')
1923 self.assertEqual(bar._callmethod('_h'), '_h()')
1924
1925 self.assertEqual(list(baz), [i*i for i in range(10)])
1926
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001927
Benjamin Petersone711caf2008-06-11 16:44:04 +00001928#
1929# Test of connecting to a remote server and using xmlrpclib for serialization
1930#
1931
1932_queue = pyqueue.Queue()
1933def get_queue():
1934 return _queue
1935
1936class QueueManager(BaseManager):
1937 '''manager class used by server process'''
1938QueueManager.register('get_queue', callable=get_queue)
1939
1940class QueueManager2(BaseManager):
1941 '''manager class which specifies the same interface as QueueManager'''
1942QueueManager2.register('get_queue')
1943
1944
1945SERIALIZER = 'xmlrpclib'
1946
1947class _TestRemoteManager(BaseTestCase):
1948
1949 ALLOWED_TYPES = ('manager',)
1950
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001951 @classmethod
1952 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001953 manager = QueueManager2(
1954 address=address, authkey=authkey, serializer=SERIALIZER
1955 )
1956 manager.connect()
1957 queue = manager.get_queue()
1958 queue.put(('hello world', None, True, 2.25))
1959
1960 def test_remote(self):
1961 authkey = os.urandom(32)
1962
1963 manager = QueueManager(
1964 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1965 )
1966 manager.start()
1967
1968 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001969 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001970 p.start()
1971
1972 manager2 = QueueManager2(
1973 address=manager.address, authkey=authkey, serializer=SERIALIZER
1974 )
1975 manager2.connect()
1976 queue = manager2.get_queue()
1977
1978 # Note that xmlrpclib will deserialize object as a list not a tuple
1979 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1980
1981 # Because we are using xmlrpclib for serialization instead of
1982 # pickle this will cause a serialization error.
1983 self.assertRaises(Exception, queue.put, time.sleep)
1984
1985 # Make queue finalizer run before the server is stopped
1986 del queue
1987 manager.shutdown()
1988
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001989class _TestManagerRestart(BaseTestCase):
1990
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001991 @classmethod
1992 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001993 manager = QueueManager(
1994 address=address, authkey=authkey, serializer=SERIALIZER)
1995 manager.connect()
1996 queue = manager.get_queue()
1997 queue.put('hello world')
1998
1999 def test_rapid_restart(self):
2000 authkey = os.urandom(32)
2001 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002002 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002003 srvr = manager.get_server()
2004 addr = srvr.address
2005 # Close the connection.Listener socket which gets opened as a part
2006 # of manager.get_server(). It's not needed for the test.
2007 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002008 manager.start()
2009
2010 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002011 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002012 p.start()
2013 queue = manager.get_queue()
2014 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002015 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002016 manager.shutdown()
2017 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002018 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002019 try:
2020 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002021 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002022 if e.errno != errno.EADDRINUSE:
2023 raise
2024 # Retry after some time, in case the old socket was lingering
2025 # (sporadic failure on buildbots)
2026 time.sleep(1.0)
2027 manager = QueueManager(
2028 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002029 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002030
Benjamin Petersone711caf2008-06-11 16:44:04 +00002031#
2032#
2033#
2034
2035SENTINEL = latin('')
2036
2037class _TestConnection(BaseTestCase):
2038
2039 ALLOWED_TYPES = ('processes', 'threads')
2040
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002041 @classmethod
2042 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002043 for msg in iter(conn.recv_bytes, SENTINEL):
2044 conn.send_bytes(msg)
2045 conn.close()
2046
2047 def test_connection(self):
2048 conn, child_conn = self.Pipe()
2049
2050 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002051 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002052 p.start()
2053
2054 seq = [1, 2.25, None]
2055 msg = latin('hello world')
2056 longmsg = msg * 10
2057 arr = array.array('i', list(range(4)))
2058
2059 if self.TYPE == 'processes':
2060 self.assertEqual(type(conn.fileno()), int)
2061
2062 self.assertEqual(conn.send(seq), None)
2063 self.assertEqual(conn.recv(), seq)
2064
2065 self.assertEqual(conn.send_bytes(msg), None)
2066 self.assertEqual(conn.recv_bytes(), msg)
2067
2068 if self.TYPE == 'processes':
2069 buffer = array.array('i', [0]*10)
2070 expected = list(arr) + [0] * (10 - len(arr))
2071 self.assertEqual(conn.send_bytes(arr), None)
2072 self.assertEqual(conn.recv_bytes_into(buffer),
2073 len(arr) * buffer.itemsize)
2074 self.assertEqual(list(buffer), expected)
2075
2076 buffer = array.array('i', [0]*10)
2077 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2078 self.assertEqual(conn.send_bytes(arr), None)
2079 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2080 len(arr) * buffer.itemsize)
2081 self.assertEqual(list(buffer), expected)
2082
2083 buffer = bytearray(latin(' ' * 40))
2084 self.assertEqual(conn.send_bytes(longmsg), None)
2085 try:
2086 res = conn.recv_bytes_into(buffer)
2087 except multiprocessing.BufferTooShort as e:
2088 self.assertEqual(e.args, (longmsg,))
2089 else:
2090 self.fail('expected BufferTooShort, got %s' % res)
2091
2092 poll = TimingWrapper(conn.poll)
2093
2094 self.assertEqual(poll(), False)
2095 self.assertTimingAlmostEqual(poll.elapsed, 0)
2096
Richard Oudkerk59d54042012-05-10 16:11:12 +01002097 self.assertEqual(poll(-1), False)
2098 self.assertTimingAlmostEqual(poll.elapsed, 0)
2099
Benjamin Petersone711caf2008-06-11 16:44:04 +00002100 self.assertEqual(poll(TIMEOUT1), False)
2101 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2102
2103 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002104 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002105
2106 self.assertEqual(poll(TIMEOUT1), True)
2107 self.assertTimingAlmostEqual(poll.elapsed, 0)
2108
2109 self.assertEqual(conn.recv(), None)
2110
2111 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2112 conn.send_bytes(really_big_msg)
2113 self.assertEqual(conn.recv_bytes(), really_big_msg)
2114
2115 conn.send_bytes(SENTINEL) # tell child to quit
2116 child_conn.close()
2117
2118 if self.TYPE == 'processes':
2119 self.assertEqual(conn.readable, True)
2120 self.assertEqual(conn.writable, True)
2121 self.assertRaises(EOFError, conn.recv)
2122 self.assertRaises(EOFError, conn.recv_bytes)
2123
2124 p.join()
2125
2126 def test_duplex_false(self):
2127 reader, writer = self.Pipe(duplex=False)
2128 self.assertEqual(writer.send(1), None)
2129 self.assertEqual(reader.recv(), 1)
2130 if self.TYPE == 'processes':
2131 self.assertEqual(reader.readable, True)
2132 self.assertEqual(reader.writable, False)
2133 self.assertEqual(writer.readable, False)
2134 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002135 self.assertRaises(OSError, reader.send, 2)
2136 self.assertRaises(OSError, writer.recv)
2137 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002138
2139 def test_spawn_close(self):
2140 # We test that a pipe connection can be closed by parent
2141 # process immediately after child is spawned. On Windows this
2142 # would have sometimes failed on old versions because
2143 # child_conn would be closed before the child got a chance to
2144 # duplicate it.
2145 conn, child_conn = self.Pipe()
2146
2147 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002148 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002149 p.start()
2150 child_conn.close() # this might complete before child initializes
2151
2152 msg = latin('hello')
2153 conn.send_bytes(msg)
2154 self.assertEqual(conn.recv_bytes(), msg)
2155
2156 conn.send_bytes(SENTINEL)
2157 conn.close()
2158 p.join()
2159
2160 def test_sendbytes(self):
2161 if self.TYPE != 'processes':
2162 return
2163
2164 msg = latin('abcdefghijklmnopqrstuvwxyz')
2165 a, b = self.Pipe()
2166
2167 a.send_bytes(msg)
2168 self.assertEqual(b.recv_bytes(), msg)
2169
2170 a.send_bytes(msg, 5)
2171 self.assertEqual(b.recv_bytes(), msg[5:])
2172
2173 a.send_bytes(msg, 7, 8)
2174 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2175
2176 a.send_bytes(msg, 26)
2177 self.assertEqual(b.recv_bytes(), latin(''))
2178
2179 a.send_bytes(msg, 26, 0)
2180 self.assertEqual(b.recv_bytes(), latin(''))
2181
2182 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2183
2184 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2185
2186 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2187
2188 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2189
2190 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2191
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002192 @classmethod
2193 def _is_fd_assigned(cls, fd):
2194 try:
2195 os.fstat(fd)
2196 except OSError as e:
2197 if e.errno == errno.EBADF:
2198 return False
2199 raise
2200 else:
2201 return True
2202
2203 @classmethod
2204 def _writefd(cls, conn, data, create_dummy_fds=False):
2205 if create_dummy_fds:
2206 for i in range(0, 256):
2207 if not cls._is_fd_assigned(i):
2208 os.dup2(conn.fileno(), i)
2209 fd = reduction.recv_handle(conn)
2210 if msvcrt:
2211 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2212 os.write(fd, data)
2213 os.close(fd)
2214
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002215 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002216 def test_fd_transfer(self):
2217 if self.TYPE != 'processes':
2218 self.skipTest("only makes sense with processes")
2219 conn, child_conn = self.Pipe(duplex=True)
2220
2221 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002222 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002223 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002224 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002225 with open(test.support.TESTFN, "wb") as f:
2226 fd = f.fileno()
2227 if msvcrt:
2228 fd = msvcrt.get_osfhandle(fd)
2229 reduction.send_handle(conn, fd, p.pid)
2230 p.join()
2231 with open(test.support.TESTFN, "rb") as f:
2232 self.assertEqual(f.read(), b"foo")
2233
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002234 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002235 @unittest.skipIf(sys.platform == "win32",
2236 "test semantics don't make sense on Windows")
2237 @unittest.skipIf(MAXFD <= 256,
2238 "largest assignable fd number is too small")
2239 @unittest.skipUnless(hasattr(os, "dup2"),
2240 "test needs os.dup2()")
2241 def test_large_fd_transfer(self):
2242 # With fd > 256 (issue #11657)
2243 if self.TYPE != 'processes':
2244 self.skipTest("only makes sense with processes")
2245 conn, child_conn = self.Pipe(duplex=True)
2246
2247 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002248 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002249 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002250 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002251 with open(test.support.TESTFN, "wb") as f:
2252 fd = f.fileno()
2253 for newfd in range(256, MAXFD):
2254 if not self._is_fd_assigned(newfd):
2255 break
2256 else:
2257 self.fail("could not find an unassigned large file descriptor")
2258 os.dup2(fd, newfd)
2259 try:
2260 reduction.send_handle(conn, newfd, p.pid)
2261 finally:
2262 os.close(newfd)
2263 p.join()
2264 with open(test.support.TESTFN, "rb") as f:
2265 self.assertEqual(f.read(), b"bar")
2266
Jesus Cea4507e642011-09-21 03:53:25 +02002267 @classmethod
2268 def _send_data_without_fd(self, conn):
2269 os.write(conn.fileno(), b"\0")
2270
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002271 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002272 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2273 def test_missing_fd_transfer(self):
2274 # Check that exception is raised when received data is not
2275 # accompanied by a file descriptor in ancillary data.
2276 if self.TYPE != 'processes':
2277 self.skipTest("only makes sense with processes")
2278 conn, child_conn = self.Pipe(duplex=True)
2279
2280 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2281 p.daemon = True
2282 p.start()
2283 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2284 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002285
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002286 def test_context(self):
2287 a, b = self.Pipe()
2288
2289 with a, b:
2290 a.send(1729)
2291 self.assertEqual(b.recv(), 1729)
2292 if self.TYPE == 'processes':
2293 self.assertFalse(a.closed)
2294 self.assertFalse(b.closed)
2295
2296 if self.TYPE == 'processes':
2297 self.assertTrue(a.closed)
2298 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002299 self.assertRaises(OSError, a.recv)
2300 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002301
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002302class _TestListener(BaseTestCase):
2303
Richard Oudkerk91257752012-06-15 21:53:34 +01002304 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002305
2306 def test_multiple_bind(self):
2307 for family in self.connection.families:
2308 l = self.connection.Listener(family=family)
2309 self.addCleanup(l.close)
2310 self.assertRaises(OSError, self.connection.Listener,
2311 l.address, family)
2312
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002313 def test_context(self):
2314 with self.connection.Listener() as l:
2315 with self.connection.Client(l.address) as c:
2316 with l.accept() as d:
2317 c.send(1729)
2318 self.assertEqual(d.recv(), 1729)
2319
2320 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002321 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002322
Benjamin Petersone711caf2008-06-11 16:44:04 +00002323class _TestListenerClient(BaseTestCase):
2324
2325 ALLOWED_TYPES = ('processes', 'threads')
2326
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002327 @classmethod
2328 def _test(cls, address):
2329 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002330 conn.send('hello')
2331 conn.close()
2332
2333 def test_listener_client(self):
2334 for family in self.connection.families:
2335 l = self.connection.Listener(family=family)
2336 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002337 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002338 p.start()
2339 conn = l.accept()
2340 self.assertEqual(conn.recv(), 'hello')
2341 p.join()
2342 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002343
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002344 def test_issue14725(self):
2345 l = self.connection.Listener()
2346 p = self.Process(target=self._test, args=(l.address,))
2347 p.daemon = True
2348 p.start()
2349 time.sleep(1)
2350 # On Windows the client process should by now have connected,
2351 # written data and closed the pipe handle by now. This causes
2352 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2353 # 14725.
2354 conn = l.accept()
2355 self.assertEqual(conn.recv(), 'hello')
2356 conn.close()
2357 p.join()
2358 l.close()
2359
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002360 def test_issue16955(self):
2361 for fam in self.connection.families:
2362 l = self.connection.Listener(family=fam)
2363 c = self.connection.Client(l.address)
2364 a = l.accept()
2365 a.send_bytes(b"hello")
2366 self.assertTrue(c.poll(1))
2367 a.close()
2368 c.close()
2369 l.close()
2370
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002371class _TestPoll(unittest.TestCase):
2372
2373 ALLOWED_TYPES = ('processes', 'threads')
2374
2375 def test_empty_string(self):
2376 a, b = self.Pipe()
2377 self.assertEqual(a.poll(), False)
2378 b.send_bytes(b'')
2379 self.assertEqual(a.poll(), True)
2380 self.assertEqual(a.poll(), True)
2381
2382 @classmethod
2383 def _child_strings(cls, conn, strings):
2384 for s in strings:
2385 time.sleep(0.1)
2386 conn.send_bytes(s)
2387 conn.close()
2388
2389 def test_strings(self):
2390 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2391 a, b = self.Pipe()
2392 p = self.Process(target=self._child_strings, args=(b, strings))
2393 p.start()
2394
2395 for s in strings:
2396 for i in range(200):
2397 if a.poll(0.01):
2398 break
2399 x = a.recv_bytes()
2400 self.assertEqual(s, x)
2401
2402 p.join()
2403
2404 @classmethod
2405 def _child_boundaries(cls, r):
2406 # Polling may "pull" a message in to the child process, but we
2407 # don't want it to pull only part of a message, as that would
2408 # corrupt the pipe for any other processes which might later
2409 # read from it.
2410 r.poll(5)
2411
2412 def test_boundaries(self):
2413 r, w = self.Pipe(False)
2414 p = self.Process(target=self._child_boundaries, args=(r,))
2415 p.start()
2416 time.sleep(2)
2417 L = [b"first", b"second"]
2418 for obj in L:
2419 w.send_bytes(obj)
2420 w.close()
2421 p.join()
2422 self.assertIn(r.recv_bytes(), L)
2423
2424 @classmethod
2425 def _child_dont_merge(cls, b):
2426 b.send_bytes(b'a')
2427 b.send_bytes(b'b')
2428 b.send_bytes(b'cd')
2429
2430 def test_dont_merge(self):
2431 a, b = self.Pipe()
2432 self.assertEqual(a.poll(0.0), False)
2433 self.assertEqual(a.poll(0.1), False)
2434
2435 p = self.Process(target=self._child_dont_merge, args=(b,))
2436 p.start()
2437
2438 self.assertEqual(a.recv_bytes(), b'a')
2439 self.assertEqual(a.poll(1.0), True)
2440 self.assertEqual(a.poll(1.0), True)
2441 self.assertEqual(a.recv_bytes(), b'b')
2442 self.assertEqual(a.poll(1.0), True)
2443 self.assertEqual(a.poll(1.0), True)
2444 self.assertEqual(a.poll(0.0), True)
2445 self.assertEqual(a.recv_bytes(), b'cd')
2446
2447 p.join()
2448
Benjamin Petersone711caf2008-06-11 16:44:04 +00002449#
2450# Test of sending connection and socket objects between processes
2451#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002452
2453@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002454class _TestPicklingConnections(BaseTestCase):
2455
2456 ALLOWED_TYPES = ('processes',)
2457
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002458 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002459 def tearDownClass(cls):
2460 from multiprocessing.reduction import resource_sharer
2461 resource_sharer.stop(timeout=5)
2462
2463 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002464 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002465 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002466 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002467 conn.send(l.address)
2468 new_conn = l.accept()
2469 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002470 new_conn.close()
2471 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002472
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002473 l = socket.socket()
2474 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002475 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002476 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002477 new_conn, addr = l.accept()
2478 conn.send(new_conn)
2479 new_conn.close()
2480 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002481
2482 conn.recv()
2483
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002484 @classmethod
2485 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002486 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002487 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002488 client.send(msg.upper())
2489 client.close()
2490
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002491 address, msg = conn.recv()
2492 client = socket.socket()
2493 client.connect(address)
2494 client.sendall(msg.upper())
2495 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002496
2497 conn.close()
2498
2499 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002500 families = self.connection.families
2501
2502 lconn, lconn0 = self.Pipe()
2503 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002504 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002505 lp.start()
2506 lconn0.close()
2507
2508 rconn, rconn0 = self.Pipe()
2509 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002510 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002511 rp.start()
2512 rconn0.close()
2513
2514 for fam in families:
2515 msg = ('This connection uses family %s' % fam).encode('ascii')
2516 address = lconn.recv()
2517 rconn.send((address, msg))
2518 new_conn = lconn.recv()
2519 self.assertEqual(new_conn.recv(), msg.upper())
2520
2521 rconn.send(None)
2522
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002523 msg = latin('This connection uses a normal socket')
2524 address = lconn.recv()
2525 rconn.send((address, msg))
2526 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002527 buf = []
2528 while True:
2529 s = new_conn.recv(100)
2530 if not s:
2531 break
2532 buf.append(s)
2533 buf = b''.join(buf)
2534 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002535 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002536
2537 lconn.send(None)
2538
2539 rconn.close()
2540 lconn.close()
2541
2542 lp.join()
2543 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002544
2545 @classmethod
2546 def child_access(cls, conn):
2547 w = conn.recv()
2548 w.send('all is well')
2549 w.close()
2550
2551 r = conn.recv()
2552 msg = r.recv()
2553 conn.send(msg*2)
2554
2555 conn.close()
2556
2557 def test_access(self):
2558 # On Windows, if we do not specify a destination pid when
2559 # using DupHandle then we need to be careful to use the
2560 # correct access flags for DuplicateHandle(), or else
2561 # DupHandle.detach() will raise PermissionError. For example,
2562 # for a read only pipe handle we should use
2563 # access=FILE_GENERIC_READ. (Unfortunately
2564 # DUPLICATE_SAME_ACCESS does not work.)
2565 conn, child_conn = self.Pipe()
2566 p = self.Process(target=self.child_access, args=(child_conn,))
2567 p.daemon = True
2568 p.start()
2569 child_conn.close()
2570
2571 r, w = self.Pipe(duplex=False)
2572 conn.send(w)
2573 w.close()
2574 self.assertEqual(r.recv(), 'all is well')
2575 r.close()
2576
2577 r, w = self.Pipe(duplex=False)
2578 conn.send(r)
2579 r.close()
2580 w.send('foobar')
2581 w.close()
2582 self.assertEqual(conn.recv(), 'foobar'*2)
2583
Benjamin Petersone711caf2008-06-11 16:44:04 +00002584#
2585#
2586#
2587
2588class _TestHeap(BaseTestCase):
2589
2590 ALLOWED_TYPES = ('processes',)
2591
2592 def test_heap(self):
2593 iterations = 5000
2594 maxblocks = 50
2595 blocks = []
2596
2597 # create and destroy lots of blocks of different sizes
2598 for i in range(iterations):
2599 size = int(random.lognormvariate(0, 1) * 1000)
2600 b = multiprocessing.heap.BufferWrapper(size)
2601 blocks.append(b)
2602 if len(blocks) > maxblocks:
2603 i = random.randrange(maxblocks)
2604 del blocks[i]
2605
2606 # get the heap object
2607 heap = multiprocessing.heap.BufferWrapper._heap
2608
2609 # verify the state of the heap
2610 all = []
2611 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002612 heap._lock.acquire()
2613 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002614 for L in list(heap._len_to_seq.values()):
2615 for arena, start, stop in L:
2616 all.append((heap._arenas.index(arena), start, stop,
2617 stop-start, 'free'))
2618 for arena, start, stop in heap._allocated_blocks:
2619 all.append((heap._arenas.index(arena), start, stop,
2620 stop-start, 'occupied'))
2621 occupied += (stop-start)
2622
2623 all.sort()
2624
2625 for i in range(len(all)-1):
2626 (arena, start, stop) = all[i][:3]
2627 (narena, nstart, nstop) = all[i+1][:3]
2628 self.assertTrue((arena != narena and nstart == 0) or
2629 (stop == nstart))
2630
Charles-François Natali778db492011-07-02 14:35:49 +02002631 def test_free_from_gc(self):
2632 # Check that freeing of blocks by the garbage collector doesn't deadlock
2633 # (issue #12352).
2634 # Make sure the GC is enabled, and set lower collection thresholds to
2635 # make collections more frequent (and increase the probability of
2636 # deadlock).
2637 if not gc.isenabled():
2638 gc.enable()
2639 self.addCleanup(gc.disable)
2640 thresholds = gc.get_threshold()
2641 self.addCleanup(gc.set_threshold, *thresholds)
2642 gc.set_threshold(10)
2643
2644 # perform numerous block allocations, with cyclic references to make
2645 # sure objects are collected asynchronously by the gc
2646 for i in range(5000):
2647 a = multiprocessing.heap.BufferWrapper(1)
2648 b = multiprocessing.heap.BufferWrapper(1)
2649 # circular references
2650 a.buddy = b
2651 b.buddy = a
2652
Benjamin Petersone711caf2008-06-11 16:44:04 +00002653#
2654#
2655#
2656
Benjamin Petersone711caf2008-06-11 16:44:04 +00002657class _Foo(Structure):
2658 _fields_ = [
2659 ('x', c_int),
2660 ('y', c_double)
2661 ]
2662
2663class _TestSharedCTypes(BaseTestCase):
2664
2665 ALLOWED_TYPES = ('processes',)
2666
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002667 def setUp(self):
2668 if not HAS_SHAREDCTYPES:
2669 self.skipTest("requires multiprocessing.sharedctypes")
2670
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002671 @classmethod
2672 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002673 x.value *= 2
2674 y.value *= 2
2675 foo.x *= 2
2676 foo.y *= 2
2677 string.value *= 2
2678 for i in range(len(arr)):
2679 arr[i] *= 2
2680
2681 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002682 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002683 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002684 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002685 arr = self.Array('d', list(range(10)), lock=lock)
2686 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002687 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002688
2689 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002690 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002691 p.start()
2692 p.join()
2693
2694 self.assertEqual(x.value, 14)
2695 self.assertAlmostEqual(y.value, 2.0/3.0)
2696 self.assertEqual(foo.x, 6)
2697 self.assertAlmostEqual(foo.y, 4.0)
2698 for i in range(10):
2699 self.assertAlmostEqual(arr[i], i*2)
2700 self.assertEqual(string.value, latin('hellohello'))
2701
2702 def test_synchronize(self):
2703 self.test_sharedctypes(lock=True)
2704
2705 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002706 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002707 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002708 foo.x = 0
2709 foo.y = 0
2710 self.assertEqual(bar.x, 2)
2711 self.assertAlmostEqual(bar.y, 5.0)
2712
2713#
2714#
2715#
2716
2717class _TestFinalize(BaseTestCase):
2718
2719 ALLOWED_TYPES = ('processes',)
2720
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002721 @classmethod
2722 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002723 class Foo(object):
2724 pass
2725
2726 a = Foo()
2727 util.Finalize(a, conn.send, args=('a',))
2728 del a # triggers callback for a
2729
2730 b = Foo()
2731 close_b = util.Finalize(b, conn.send, args=('b',))
2732 close_b() # triggers callback for b
2733 close_b() # does nothing because callback has already been called
2734 del b # does nothing because callback has already been called
2735
2736 c = Foo()
2737 util.Finalize(c, conn.send, args=('c',))
2738
2739 d10 = Foo()
2740 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2741
2742 d01 = Foo()
2743 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2744 d02 = Foo()
2745 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2746 d03 = Foo()
2747 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2748
2749 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2750
2751 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2752
Ezio Melotti13925002011-03-16 11:05:33 +02002753 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002754 # garbage collecting locals
2755 util._exit_function()
2756 conn.close()
2757 os._exit(0)
2758
2759 def test_finalize(self):
2760 conn, child_conn = self.Pipe()
2761
2762 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002763 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002764 p.start()
2765 p.join()
2766
2767 result = [obj for obj in iter(conn.recv, 'STOP')]
2768 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2769
2770#
2771# Test that from ... import * works for each module
2772#
2773
2774class _TestImportStar(BaseTestCase):
2775
2776 ALLOWED_TYPES = ('processes',)
2777
2778 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002779 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002780 'multiprocessing', 'multiprocessing.connection',
2781 'multiprocessing.heap', 'multiprocessing.managers',
2782 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002783 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002784 ]
2785
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002786 if HAS_REDUCTION:
2787 modules.append('multiprocessing.reduction')
2788
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002789 if c_int is not None:
2790 # This module requires _ctypes
2791 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002792
2793 for name in modules:
2794 __import__(name)
2795 mod = sys.modules[name]
2796
2797 for attr in getattr(mod, '__all__', ()):
2798 self.assertTrue(
2799 hasattr(mod, attr),
2800 '%r does not have attribute %r' % (mod, attr)
2801 )
2802
2803#
2804# Quick test that logging works -- does not test logging output
2805#
2806
2807class _TestLogging(BaseTestCase):
2808
2809 ALLOWED_TYPES = ('processes',)
2810
2811 def test_enable_logging(self):
2812 logger = multiprocessing.get_logger()
2813 logger.setLevel(util.SUBWARNING)
2814 self.assertTrue(logger is not None)
2815 logger.debug('this will not be printed')
2816 logger.info('nor will this')
2817 logger.setLevel(LOG_LEVEL)
2818
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002819 @classmethod
2820 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002821 logger = multiprocessing.get_logger()
2822 conn.send(logger.getEffectiveLevel())
2823
2824 def test_level(self):
2825 LEVEL1 = 32
2826 LEVEL2 = 37
2827
2828 logger = multiprocessing.get_logger()
2829 root_logger = logging.getLogger()
2830 root_level = root_logger.level
2831
2832 reader, writer = multiprocessing.Pipe(duplex=False)
2833
2834 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002835 p = self.Process(target=self._test_level, args=(writer,))
2836 p.daemon = True
2837 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002838 self.assertEqual(LEVEL1, reader.recv())
2839
2840 logger.setLevel(logging.NOTSET)
2841 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002842 p = self.Process(target=self._test_level, args=(writer,))
2843 p.daemon = True
2844 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002845 self.assertEqual(LEVEL2, reader.recv())
2846
2847 root_logger.setLevel(root_level)
2848 logger.setLevel(level=LOG_LEVEL)
2849
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002850
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002851# class _TestLoggingProcessName(BaseTestCase):
2852#
2853# def handle(self, record):
2854# assert record.processName == multiprocessing.current_process().name
2855# self.__handled = True
2856#
2857# def test_logging(self):
2858# handler = logging.Handler()
2859# handler.handle = self.handle
2860# self.__handled = False
2861# # Bypass getLogger() and side-effects
2862# logger = logging.getLoggerClass()(
2863# 'multiprocessing.test.TestLoggingProcessName')
2864# logger.addHandler(handler)
2865# logger.propagate = False
2866#
2867# logger.warn('foo')
2868# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002869
Benjamin Petersone711caf2008-06-11 16:44:04 +00002870#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002871# Check that Process.join() retries if os.waitpid() fails with EINTR
2872#
2873
2874class _TestPollEintr(BaseTestCase):
2875
2876 ALLOWED_TYPES = ('processes',)
2877
2878 @classmethod
2879 def _killer(cls, pid):
2880 time.sleep(0.5)
2881 os.kill(pid, signal.SIGUSR1)
2882
2883 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2884 def test_poll_eintr(self):
2885 got_signal = [False]
2886 def record(*args):
2887 got_signal[0] = True
2888 pid = os.getpid()
2889 oldhandler = signal.signal(signal.SIGUSR1, record)
2890 try:
2891 killer = self.Process(target=self._killer, args=(pid,))
2892 killer.start()
2893 p = self.Process(target=time.sleep, args=(1,))
2894 p.start()
2895 p.join()
2896 self.assertTrue(got_signal[0])
2897 self.assertEqual(p.exitcode, 0)
2898 killer.join()
2899 finally:
2900 signal.signal(signal.SIGUSR1, oldhandler)
2901
2902#
Jesse Noller6214edd2009-01-19 16:23:53 +00002903# Test to verify handle verification, see issue 3321
2904#
2905
2906class TestInvalidHandle(unittest.TestCase):
2907
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002908 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002909 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002910 conn = multiprocessing.connection.Connection(44977608)
2911 try:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002912 self.assertRaises((ValueError, OSError), conn.poll)
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002913 finally:
2914 # Hack private attribute _handle to avoid printing an error
2915 # in conn.__del__
2916 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002917 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002918 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002919
Jesse Noller6214edd2009-01-19 16:23:53 +00002920#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002921# Functions used to create test cases from the base ones in this module
2922#
2923
Benjamin Petersone711caf2008-06-11 16:44:04 +00002924def create_test_cases(Mixin, type):
2925 result = {}
2926 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002927 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002928 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002929
2930 for name in list(glob.keys()):
2931 if name.startswith('_Test'):
2932 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002933 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002934 if type in base.ALLOWED_TYPES:
2935 newname = 'With' + Type + name[1:]
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002936 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002937 pass
2938 result[newname] = Temp
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002939 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002940 Temp.__module__ = Mixin.__module__
2941 return result
2942
2943#
2944# Create test cases
2945#
2946
2947class ProcessesMixin(object):
2948 TYPE = 'processes'
2949 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002950 connection = multiprocessing.connection
2951 current_process = staticmethod(multiprocessing.current_process)
2952 active_children = staticmethod(multiprocessing.active_children)
2953 Pool = staticmethod(multiprocessing.Pool)
2954 Pipe = staticmethod(multiprocessing.Pipe)
2955 Queue = staticmethod(multiprocessing.Queue)
2956 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2957 Lock = staticmethod(multiprocessing.Lock)
2958 RLock = staticmethod(multiprocessing.RLock)
2959 Semaphore = staticmethod(multiprocessing.Semaphore)
2960 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2961 Condition = staticmethod(multiprocessing.Condition)
2962 Event = staticmethod(multiprocessing.Event)
2963 Barrier = staticmethod(multiprocessing.Barrier)
2964 Value = staticmethod(multiprocessing.Value)
2965 Array = staticmethod(multiprocessing.Array)
2966 RawValue = staticmethod(multiprocessing.RawValue)
2967 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002968
2969testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2970globals().update(testcases_processes)
2971
2972
2973class ManagerMixin(object):
2974 TYPE = 'manager'
2975 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002976 Queue = property(operator.attrgetter('manager.Queue'))
2977 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
2978 Lock = property(operator.attrgetter('manager.Lock'))
2979 RLock = property(operator.attrgetter('manager.RLock'))
2980 Semaphore = property(operator.attrgetter('manager.Semaphore'))
2981 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
2982 Condition = property(operator.attrgetter('manager.Condition'))
2983 Event = property(operator.attrgetter('manager.Event'))
2984 Barrier = property(operator.attrgetter('manager.Barrier'))
2985 Value = property(operator.attrgetter('manager.Value'))
2986 Array = property(operator.attrgetter('manager.Array'))
2987 list = property(operator.attrgetter('manager.list'))
2988 dict = property(operator.attrgetter('manager.dict'))
2989 Namespace = property(operator.attrgetter('manager.Namespace'))
2990
2991 @classmethod
2992 def Pool(cls, *args, **kwds):
2993 return cls.manager.Pool(*args, **kwds)
2994
2995 @classmethod
2996 def setUpClass(cls):
2997 cls.manager = multiprocessing.Manager()
2998
2999 @classmethod
3000 def tearDownClass(cls):
Ezio Melottidc6763b2013-03-11 21:39:18 +02003001 # only the manager process should be returned by active_children()
3002 # but this can take a bit on slow machines, so wait a few seconds
3003 # if there are other children too (see #17395)
3004 t = 0.01
3005 while len(multiprocessing.active_children()) > 1 and t < 5:
3006 time.sleep(t)
3007 t *= 2
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003008 gc.collect() # do garbage collection
3009 if cls.manager._number_of_objects() != 0:
3010 # This is not really an error since some tests do not
3011 # ensure that all processes which hold a reference to a
3012 # managed object have been joined.
3013 print('Shared objects which still exist at manager shutdown:')
3014 print(cls.manager._debug_info())
3015 cls.manager.shutdown()
3016 cls.manager.join()
3017 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00003018
3019testcases_manager = create_test_cases(ManagerMixin, type='manager')
3020globals().update(testcases_manager)
3021
3022
3023class ThreadsMixin(object):
3024 TYPE = 'threads'
3025 Process = multiprocessing.dummy.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003026 connection = multiprocessing.dummy.connection
3027 current_process = staticmethod(multiprocessing.dummy.current_process)
3028 active_children = staticmethod(multiprocessing.dummy.active_children)
3029 Pool = staticmethod(multiprocessing.Pool)
3030 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3031 Queue = staticmethod(multiprocessing.dummy.Queue)
3032 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3033 Lock = staticmethod(multiprocessing.dummy.Lock)
3034 RLock = staticmethod(multiprocessing.dummy.RLock)
3035 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3036 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3037 Condition = staticmethod(multiprocessing.dummy.Condition)
3038 Event = staticmethod(multiprocessing.dummy.Event)
3039 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3040 Value = staticmethod(multiprocessing.dummy.Value)
3041 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003042
3043testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3044globals().update(testcases_threads)
3045
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003046
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003047class OtherTest(unittest.TestCase):
3048 # TODO: add more tests for deliver/answer challenge.
3049 def test_deliver_challenge_auth_failure(self):
3050 class _FakeConnection(object):
3051 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003052 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003053 def send_bytes(self, data):
3054 pass
3055 self.assertRaises(multiprocessing.AuthenticationError,
3056 multiprocessing.connection.deliver_challenge,
3057 _FakeConnection(), b'abc')
3058
3059 def test_answer_challenge_auth_failure(self):
3060 class _FakeConnection(object):
3061 def __init__(self):
3062 self.count = 0
3063 def recv_bytes(self, size):
3064 self.count += 1
3065 if self.count == 1:
3066 return multiprocessing.connection.CHALLENGE
3067 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003068 return b'something bogus'
3069 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003070 def send_bytes(self, data):
3071 pass
3072 self.assertRaises(multiprocessing.AuthenticationError,
3073 multiprocessing.connection.answer_challenge,
3074 _FakeConnection(), b'abc')
3075
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003076#
3077# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3078#
3079
3080def initializer(ns):
3081 ns.test += 1
3082
3083class TestInitializers(unittest.TestCase):
3084 def setUp(self):
3085 self.mgr = multiprocessing.Manager()
3086 self.ns = self.mgr.Namespace()
3087 self.ns.test = 0
3088
3089 def tearDown(self):
3090 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003091 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003092
3093 def test_manager_initializer(self):
3094 m = multiprocessing.managers.SyncManager()
3095 self.assertRaises(TypeError, m.start, 1)
3096 m.start(initializer, (self.ns,))
3097 self.assertEqual(self.ns.test, 1)
3098 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003099 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003100
3101 def test_pool_initializer(self):
3102 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3103 p = multiprocessing.Pool(1, initializer, (self.ns,))
3104 p.close()
3105 p.join()
3106 self.assertEqual(self.ns.test, 1)
3107
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003108#
3109# Issue 5155, 5313, 5331: Test process in processes
3110# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3111#
3112
3113def _ThisSubProcess(q):
3114 try:
3115 item = q.get(block=False)
3116 except pyqueue.Empty:
3117 pass
3118
3119def _TestProcess(q):
3120 queue = multiprocessing.Queue()
3121 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003122 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003123 subProc.start()
3124 subProc.join()
3125
3126def _afunc(x):
3127 return x*x
3128
3129def pool_in_process():
3130 pool = multiprocessing.Pool(processes=4)
3131 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003132 pool.close()
3133 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003134
3135class _file_like(object):
3136 def __init__(self, delegate):
3137 self._delegate = delegate
3138 self._pid = None
3139
3140 @property
3141 def cache(self):
3142 pid = os.getpid()
3143 # There are no race conditions since fork keeps only the running thread
3144 if pid != self._pid:
3145 self._pid = pid
3146 self._cache = []
3147 return self._cache
3148
3149 def write(self, data):
3150 self.cache.append(data)
3151
3152 def flush(self):
3153 self._delegate.write(''.join(self.cache))
3154 self._cache = []
3155
3156class TestStdinBadfiledescriptor(unittest.TestCase):
3157
3158 def test_queue_in_process(self):
3159 queue = multiprocessing.Queue()
3160 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3161 proc.start()
3162 proc.join()
3163
3164 def test_pool_in_process(self):
3165 p = multiprocessing.Process(target=pool_in_process)
3166 p.start()
3167 p.join()
3168
3169 def test_flushing(self):
3170 sio = io.StringIO()
3171 flike = _file_like(sio)
3172 flike.write('foo')
3173 proc = multiprocessing.Process(target=lambda: flike.flush())
3174 flike.flush()
3175 assert sio.getvalue() == 'foo'
3176
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003177
3178class TestWait(unittest.TestCase):
3179
3180 @classmethod
3181 def _child_test_wait(cls, w, slow):
3182 for i in range(10):
3183 if slow:
3184 time.sleep(random.random()*0.1)
3185 w.send((i, os.getpid()))
3186 w.close()
3187
3188 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003189 from multiprocessing.connection import wait
3190 readers = []
3191 procs = []
3192 messages = []
3193
3194 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003195 r, w = multiprocessing.Pipe(duplex=False)
3196 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003197 p.daemon = True
3198 p.start()
3199 w.close()
3200 readers.append(r)
3201 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003202 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003203
3204 while readers:
3205 for r in wait(readers):
3206 try:
3207 msg = r.recv()
3208 except EOFError:
3209 readers.remove(r)
3210 r.close()
3211 else:
3212 messages.append(msg)
3213
3214 messages.sort()
3215 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3216 self.assertEqual(messages, expected)
3217
3218 @classmethod
3219 def _child_test_wait_socket(cls, address, slow):
3220 s = socket.socket()
3221 s.connect(address)
3222 for i in range(10):
3223 if slow:
3224 time.sleep(random.random()*0.1)
3225 s.sendall(('%s\n' % i).encode('ascii'))
3226 s.close()
3227
3228 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003229 from multiprocessing.connection import wait
3230 l = socket.socket()
3231 l.bind(('', 0))
3232 l.listen(4)
3233 addr = ('localhost', l.getsockname()[1])
3234 readers = []
3235 procs = []
3236 dic = {}
3237
3238 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003239 p = multiprocessing.Process(target=self._child_test_wait_socket,
3240 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003241 p.daemon = True
3242 p.start()
3243 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003244 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003245
3246 for i in range(4):
3247 r, _ = l.accept()
3248 readers.append(r)
3249 dic[r] = []
3250 l.close()
3251
3252 while readers:
3253 for r in wait(readers):
3254 msg = r.recv(32)
3255 if not msg:
3256 readers.remove(r)
3257 r.close()
3258 else:
3259 dic[r].append(msg)
3260
3261 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3262 for v in dic.values():
3263 self.assertEqual(b''.join(v), expected)
3264
3265 def test_wait_slow(self):
3266 self.test_wait(True)
3267
3268 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003269 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003270
3271 def test_wait_timeout(self):
3272 from multiprocessing.connection import wait
3273
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003274 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003275 a, b = multiprocessing.Pipe()
3276
3277 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003278 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003279 delta = time.time() - start
3280
3281 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003282 self.assertLess(delta, expected * 2)
3283 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003284
3285 b.send(None)
3286
3287 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003288 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003289 delta = time.time() - start
3290
3291 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003292 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003293
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003294 @classmethod
3295 def signal_and_sleep(cls, sem, period):
3296 sem.release()
3297 time.sleep(period)
3298
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003299 def test_wait_integer(self):
3300 from multiprocessing.connection import wait
3301
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003302 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01003303 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003304 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003305 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003306 p = multiprocessing.Process(target=self.signal_and_sleep,
3307 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003308
3309 p.start()
3310 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003311 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003312
3313 start = time.time()
3314 res = wait([a, p.sentinel, b], expected + 20)
3315 delta = time.time() - start
3316
3317 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003318 self.assertLess(delta, expected + 2)
3319 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003320
3321 a.send(None)
3322
3323 start = time.time()
3324 res = wait([a, p.sentinel, b], 20)
3325 delta = time.time() - start
3326
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003327 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003328 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003329
3330 b.send(None)
3331
3332 start = time.time()
3333 res = wait([a, p.sentinel, b], 20)
3334 delta = time.time() - start
3335
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003336 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003337 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003338
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003339 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003340 p.join()
3341
Richard Oudkerk59d54042012-05-10 16:11:12 +01003342 def test_neg_timeout(self):
3343 from multiprocessing.connection import wait
3344 a, b = multiprocessing.Pipe()
3345 t = time.time()
3346 res = wait([a], timeout=-1)
3347 t = time.time() - t
3348 self.assertEqual(res, [])
3349 self.assertLess(t, 1)
3350 a.close()
3351 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003352
Antoine Pitrou709176f2012-04-01 17:19:09 +02003353#
3354# Issue 14151: Test invalid family on invalid environment
3355#
3356
3357class TestInvalidFamily(unittest.TestCase):
3358
3359 @unittest.skipIf(WIN32, "skipped on Windows")
3360 def test_invalid_family(self):
3361 with self.assertRaises(ValueError):
3362 multiprocessing.connection.Listener(r'\\.\test')
3363
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003364 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3365 def test_invalid_family_win32(self):
3366 with self.assertRaises(ValueError):
3367 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003368
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003369#
3370# Issue 12098: check sys.flags of child matches that for parent
3371#
3372
3373class TestFlags(unittest.TestCase):
3374 @classmethod
3375 def run_in_grandchild(cls, conn):
3376 conn.send(tuple(sys.flags))
3377
3378 @classmethod
3379 def run_in_child(cls):
3380 import json
3381 r, w = multiprocessing.Pipe(duplex=False)
3382 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3383 p.start()
3384 grandchild_flags = r.recv()
3385 p.join()
3386 r.close()
3387 w.close()
3388 flags = (tuple(sys.flags), grandchild_flags)
3389 print(json.dumps(flags))
3390
3391 def test_flags(self):
3392 import json, subprocess
3393 # start child process using unusual flags
3394 prog = ('from test.test_multiprocessing import TestFlags; ' +
3395 'TestFlags.run_in_child()')
3396 data = subprocess.check_output(
3397 [sys.executable, '-E', '-S', '-O', '-c', prog])
3398 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3399 self.assertEqual(child_flags, grandchild_flags)
3400
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003401#
3402# Test interaction with socket timeouts - see Issue #6056
3403#
3404
3405class TestTimeouts(unittest.TestCase):
3406 @classmethod
3407 def _test_timeout(cls, child, address):
3408 time.sleep(1)
3409 child.send(123)
3410 child.close()
3411 conn = multiprocessing.connection.Client(address)
3412 conn.send(456)
3413 conn.close()
3414
3415 def test_timeout(self):
3416 old_timeout = socket.getdefaulttimeout()
3417 try:
3418 socket.setdefaulttimeout(0.1)
3419 parent, child = multiprocessing.Pipe(duplex=True)
3420 l = multiprocessing.connection.Listener(family='AF_INET')
3421 p = multiprocessing.Process(target=self._test_timeout,
3422 args=(child, l.address))
3423 p.start()
3424 child.close()
3425 self.assertEqual(parent.recv(), 123)
3426 parent.close()
3427 conn = l.accept()
3428 self.assertEqual(conn.recv(), 456)
3429 conn.close()
3430 l.close()
3431 p.join(10)
3432 finally:
3433 socket.setdefaulttimeout(old_timeout)
3434
Richard Oudkerke88a2442012-08-14 11:41:32 +01003435#
3436# Test what happens with no "if __name__ == '__main__'"
3437#
3438
3439class TestNoForkBomb(unittest.TestCase):
3440 def test_noforkbomb(self):
3441 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3442 if WIN32:
3443 rc, out, err = test.script_helper.assert_python_failure(name)
3444 self.assertEqual('', out.decode('ascii'))
3445 self.assertIn('RuntimeError', err.decode('ascii'))
3446 else:
3447 rc, out, err = test.script_helper.assert_python_ok(name)
3448 self.assertEqual('123', out.decode('ascii').rstrip())
3449 self.assertEqual('', err.decode('ascii'))
3450
3451#
3452#
3453#
3454
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003455testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003456 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
Richard Oudkerk3165a752012-08-14 12:51:14 +01003457 TestFlags, TestTimeouts, TestNoForkBomb]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003458
Benjamin Petersone711caf2008-06-11 16:44:04 +00003459#
3460#
3461#
3462
3463def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003464 if sys.platform.startswith("linux"):
3465 try:
3466 lock = multiprocessing.RLock()
3467 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003468 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003469
Charles-François Natali221ef672011-11-22 18:55:22 +01003470 check_enough_semaphores()
3471
Benjamin Petersone711caf2008-06-11 16:44:04 +00003472 if run is None:
3473 from test.support import run_unittest as run
3474
3475 util.get_temp_dir() # creates temp directory for use by all processes
3476
3477 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3478
Benjamin Petersone711caf2008-06-11 16:44:04 +00003479 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003480 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3481 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003482 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3483 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003484 )
3485
3486 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3487 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003488 run(suite)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003489
3490def main():
3491 test_main(unittest.TextTestRunner(verbosity=2).run)
3492
3493if __name__ == '__main__':
3494 main()