blob: 14ec61c39323bc4459a8caa850f9d4e1eb6e1faf [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
R. David Murraya21e4ca2009-03-31 23:16:50 +000022import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010023import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000024
Benjamin Petersone5384b02008-10-04 22:00:42 +000025
R. David Murraya21e4ca2009-03-31 23:16:50 +000026# Skip tests if _multiprocessing wasn't built.
27_multiprocessing = test.support.import_module('_multiprocessing')
28# Skip tests if sem_open implementation is broken.
29test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000030# import threading after _multiprocessing to raise a more revelant error
31# message: "No module named _multiprocessing". _multiprocessing is not compiled
32# without thread support.
33import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000034
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.dummy
36import multiprocessing.connection
37import multiprocessing.managers
38import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000039import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
Charles-François Natalibc8f0822011-09-20 20:36:51 +020041from multiprocessing import util
42
43try:
44 from multiprocessing import reduction
45 HAS_REDUCTION = True
46except ImportError:
47 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
Brian Curtinafa88b52010-10-07 01:12:19 +000049try:
50 from multiprocessing.sharedctypes import Value, copy
51 HAS_SHAREDCTYPES = True
52except ImportError:
53 HAS_SHAREDCTYPES = False
54
Antoine Pitroubcb39d42011-08-23 19:46:22 +020055try:
56 import msvcrt
57except ImportError:
58 msvcrt = None
59
Benjamin Petersone711caf2008-06-11 16:44:04 +000060#
61#
62#
63
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000064def latin(s):
65 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
Benjamin Petersone711caf2008-06-11 16:44:04 +000067#
68# Constants
69#
70
71LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000072#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000073
74DELTA = 0.1
75CHECK_TIMINGS = False # making true makes tests take a lot longer
76 # and can sometimes cause some non-serious
77 # failures because some calls block a bit
78 # longer than expected
79if CHECK_TIMINGS:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
81else:
82 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
83
84HAVE_GETVALUE = not getattr(_multiprocessing,
85 'HAVE_BROKEN_SEM_GETVALUE', False)
86
Jesse Noller6214edd2009-01-19 16:23:53 +000087WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
Richard Oudkerk59d54042012-05-10 16:11:12 +010089from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020090
Richard Oudkerk59d54042012-05-10 16:11:12 +010091def wait_for_handle(handle, timeout):
92 if timeout is not None and timeout < 0.0:
93 timeout = None
94 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000095
Antoine Pitroubcb39d42011-08-23 19:46:22 +020096try:
97 MAXFD = os.sysconf("SC_OPEN_MAX")
98except:
99 MAXFD = 256
100
Benjamin Petersone711caf2008-06-11 16:44:04 +0000101#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000102# Some tests require ctypes
103#
104
105try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000106 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000107except ImportError:
108 Structure = object
109 c_int = c_double = None
110
Charles-François Natali221ef672011-11-22 18:55:22 +0100111
112def check_enough_semaphores():
113 """Check that the system supports enough semaphores to run the test."""
114 # minimum number of semaphores available according to POSIX
115 nsems_min = 256
116 try:
117 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
118 except (AttributeError, ValueError):
119 # sysconf not available or setting not available
120 return
121 if nsems == -1 or nsems >= nsems_min:
122 return
123 raise unittest.SkipTest("The OS doesn't support enough semaphores "
124 "to run the test (required: %d)." % nsems_min)
125
126
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000127#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128# Creates a wrapper for a function which records the time it takes to finish
129#
130
131class TimingWrapper(object):
132
133 def __init__(self, func):
134 self.func = func
135 self.elapsed = None
136
137 def __call__(self, *args, **kwds):
138 t = time.time()
139 try:
140 return self.func(*args, **kwds)
141 finally:
142 self.elapsed = time.time() - t
143
144#
145# Base class for test cases
146#
147
148class BaseTestCase(object):
149
150 ALLOWED_TYPES = ('processes', 'manager', 'threads')
151
152 def assertTimingAlmostEqual(self, a, b):
153 if CHECK_TIMINGS:
154 self.assertAlmostEqual(a, b, 1)
155
156 def assertReturnsIfImplemented(self, value, func, *args):
157 try:
158 res = func(*args)
159 except NotImplementedError:
160 pass
161 else:
162 return self.assertEqual(value, res)
163
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000164 # For the sanity of Windows users, rather than crashing or freezing in
165 # multiple ways.
166 def __reduce__(self, *args):
167 raise NotImplementedError("shouldn't try to pickle a test case")
168
169 __reduce_ex__ = __reduce__
170
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171#
172# Return the value of a semaphore
173#
174
175def get_value(self):
176 try:
177 return self.get_value()
178 except AttributeError:
179 try:
180 return self._Semaphore__value
181 except AttributeError:
182 try:
183 return self._value
184 except AttributeError:
185 raise NotImplementedError
186
187#
188# Testcases
189#
190
191class _TestProcess(BaseTestCase):
192
193 ALLOWED_TYPES = ('processes', 'threads')
194
195 def test_current(self):
196 if self.TYPE == 'threads':
197 return
198
199 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000200 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201
202 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000204 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 self.assertEqual(current.ident, os.getpid())
207 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000209 def test_daemon_argument(self):
210 if self.TYPE == "threads":
211 return
212
213 # By default uses the current process's daemon flag.
214 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000215 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000216 proc1 = self.Process(target=self._test, daemon=True)
217 self.assertTrue(proc1.daemon)
218 proc2 = self.Process(target=self._test, daemon=False)
219 self.assertFalse(proc2.daemon)
220
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000221 @classmethod
222 def _test(cls, q, *args, **kwds):
223 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000224 q.put(args)
225 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000227 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000228 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000229 q.put(current.pid)
230
231 def test_process(self):
232 q = self.Queue(1)
233 e = self.Event()
234 args = (q, 1, 2)
235 kwargs = {'hello':23, 'bye':2.54}
236 name = 'SomeProcess'
237 p = self.Process(
238 target=self._test, args=args, kwargs=kwargs, name=name
239 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241 current = self.current_process()
242
243 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.authkey, current.authkey)
245 self.assertEqual(p.is_alive(), False)
246 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000247 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000249 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250
251 p.start()
252
Ezio Melottib3aedd42010-11-20 19:04:17 +0000253 self.assertEqual(p.exitcode, None)
254 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256
Ezio Melottib3aedd42010-11-20 19:04:17 +0000257 self.assertEqual(q.get(), args[1:])
258 self.assertEqual(q.get(), kwargs)
259 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(q.get(), current.authkey)
262 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
264 p.join()
265
Ezio Melottib3aedd42010-11-20 19:04:17 +0000266 self.assertEqual(p.exitcode, 0)
267 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000270 @classmethod
271 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272 time.sleep(1000)
273
274 def test_terminate(self):
275 if self.TYPE == 'threads':
276 return
277
278 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000279 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 p.start()
281
282 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000283 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000284 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285
Richard Oudkerk59d54042012-05-10 16:11:12 +0100286 join = TimingWrapper(p.join)
287
288 self.assertEqual(join(0), None)
289 self.assertTimingAlmostEqual(join.elapsed, 0.0)
290 self.assertEqual(p.is_alive(), True)
291
292 self.assertEqual(join(-1), None)
293 self.assertTimingAlmostEqual(join.elapsed, 0.0)
294 self.assertEqual(p.is_alive(), True)
295
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296 p.terminate()
297
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298 self.assertEqual(join(), None)
299 self.assertTimingAlmostEqual(join.elapsed, 0.0)
300
301 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000302 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000303
304 p.join()
305
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000306 # XXX sometimes get p.exitcode == 0 on Windows ...
307 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000308
309 def test_cpu_count(self):
310 try:
311 cpus = multiprocessing.cpu_count()
312 except NotImplementedError:
313 cpus = 1
314 self.assertTrue(type(cpus) is int)
315 self.assertTrue(cpus >= 1)
316
317 def test_active_children(self):
318 self.assertEqual(type(self.active_children()), list)
319
320 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000321 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322
Jesus Cea94f964f2011-09-09 20:26:57 +0200323 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000325 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000326
327 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000328 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000329
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000330 @classmethod
331 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000332 from multiprocessing import forking
333 wconn.send(id)
334 if len(id) < 2:
335 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000336 p = cls.Process(
337 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338 )
339 p.start()
340 p.join()
341
342 def test_recursion(self):
343 rconn, wconn = self.Pipe(duplex=False)
344 self._test_recursion(wconn, [])
345
346 time.sleep(DELTA)
347 result = []
348 while rconn.poll():
349 result.append(rconn.recv())
350
351 expected = [
352 [],
353 [0],
354 [0, 0],
355 [0, 1],
356 [1],
357 [1, 0],
358 [1, 1]
359 ]
360 self.assertEqual(result, expected)
361
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200362 @classmethod
363 def _test_sentinel(cls, event):
364 event.wait(10.0)
365
366 def test_sentinel(self):
367 if self.TYPE == "threads":
368 return
369 event = self.Event()
370 p = self.Process(target=self._test_sentinel, args=(event,))
371 with self.assertRaises(ValueError):
372 p.sentinel
373 p.start()
374 self.addCleanup(p.join)
375 sentinel = p.sentinel
376 self.assertIsInstance(sentinel, int)
377 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
378 event.set()
379 p.join()
380 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
381
Benjamin Petersone711caf2008-06-11 16:44:04 +0000382#
383#
384#
385
386class _UpperCaser(multiprocessing.Process):
387
388 def __init__(self):
389 multiprocessing.Process.__init__(self)
390 self.child_conn, self.parent_conn = multiprocessing.Pipe()
391
392 def run(self):
393 self.parent_conn.close()
394 for s in iter(self.child_conn.recv, None):
395 self.child_conn.send(s.upper())
396 self.child_conn.close()
397
398 def submit(self, s):
399 assert type(s) is str
400 self.parent_conn.send(s)
401 return self.parent_conn.recv()
402
403 def stop(self):
404 self.parent_conn.send(None)
405 self.parent_conn.close()
406 self.child_conn.close()
407
408class _TestSubclassingProcess(BaseTestCase):
409
410 ALLOWED_TYPES = ('processes',)
411
412 def test_subclassing(self):
413 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200414 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000415 uppercaser.start()
416 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
417 self.assertEqual(uppercaser.submit('world'), 'WORLD')
418 uppercaser.stop()
419 uppercaser.join()
420
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100421 def test_stderr_flush(self):
422 # sys.stderr is flushed at process shutdown (issue #13812)
423 if self.TYPE == "threads":
424 return
425
426 testfn = test.support.TESTFN
427 self.addCleanup(test.support.unlink, testfn)
428 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
429 proc.start()
430 proc.join()
431 with open(testfn, 'r') as f:
432 err = f.read()
433 # The whole traceback was printed
434 self.assertIn("ZeroDivisionError", err)
435 self.assertIn("test_multiprocessing.py", err)
436 self.assertIn("1/0 # MARKER", err)
437
438 @classmethod
439 def _test_stderr_flush(cls, testfn):
440 sys.stderr = open(testfn, 'w')
441 1/0 # MARKER
442
443
Richard Oudkerk29471de2012-06-06 19:04:57 +0100444 @classmethod
445 def _test_sys_exit(cls, reason, testfn):
446 sys.stderr = open(testfn, 'w')
447 sys.exit(reason)
448
449 def test_sys_exit(self):
450 # See Issue 13854
451 if self.TYPE == 'threads':
452 return
453
454 testfn = test.support.TESTFN
455 self.addCleanup(test.support.unlink, testfn)
456
457 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
458 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
459 p.daemon = True
460 p.start()
461 p.join(5)
462 self.assertEqual(p.exitcode, code)
463
464 with open(testfn, 'r') as f:
465 self.assertEqual(f.read().rstrip(), str(reason))
466
467 for reason in (True, False, 8):
468 p = self.Process(target=sys.exit, args=(reason,))
469 p.daemon = True
470 p.start()
471 p.join(5)
472 self.assertEqual(p.exitcode, reason)
473
Benjamin Petersone711caf2008-06-11 16:44:04 +0000474#
475#
476#
477
478def queue_empty(q):
479 if hasattr(q, 'empty'):
480 return q.empty()
481 else:
482 return q.qsize() == 0
483
484def queue_full(q, maxsize):
485 if hasattr(q, 'full'):
486 return q.full()
487 else:
488 return q.qsize() == maxsize
489
490
491class _TestQueue(BaseTestCase):
492
493
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000494 @classmethod
495 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000496 child_can_start.wait()
497 for i in range(6):
498 queue.get()
499 parent_can_continue.set()
500
501 def test_put(self):
502 MAXSIZE = 6
503 queue = self.Queue(maxsize=MAXSIZE)
504 child_can_start = self.Event()
505 parent_can_continue = self.Event()
506
507 proc = self.Process(
508 target=self._test_put,
509 args=(queue, child_can_start, parent_can_continue)
510 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000511 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000512 proc.start()
513
514 self.assertEqual(queue_empty(queue), True)
515 self.assertEqual(queue_full(queue, MAXSIZE), False)
516
517 queue.put(1)
518 queue.put(2, True)
519 queue.put(3, True, None)
520 queue.put(4, False)
521 queue.put(5, False, None)
522 queue.put_nowait(6)
523
524 # the values may be in buffer but not yet in pipe so sleep a bit
525 time.sleep(DELTA)
526
527 self.assertEqual(queue_empty(queue), False)
528 self.assertEqual(queue_full(queue, MAXSIZE), True)
529
530 put = TimingWrapper(queue.put)
531 put_nowait = TimingWrapper(queue.put_nowait)
532
533 self.assertRaises(pyqueue.Full, put, 7, False)
534 self.assertTimingAlmostEqual(put.elapsed, 0)
535
536 self.assertRaises(pyqueue.Full, put, 7, False, None)
537 self.assertTimingAlmostEqual(put.elapsed, 0)
538
539 self.assertRaises(pyqueue.Full, put_nowait, 7)
540 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
541
542 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
543 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
544
545 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
546 self.assertTimingAlmostEqual(put.elapsed, 0)
547
548 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
549 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
550
551 child_can_start.set()
552 parent_can_continue.wait()
553
554 self.assertEqual(queue_empty(queue), True)
555 self.assertEqual(queue_full(queue, MAXSIZE), False)
556
557 proc.join()
558
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000559 @classmethod
560 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000561 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000562 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000563 queue.put(2)
564 queue.put(3)
565 queue.put(4)
566 queue.put(5)
567 parent_can_continue.set()
568
569 def test_get(self):
570 queue = self.Queue()
571 child_can_start = self.Event()
572 parent_can_continue = self.Event()
573
574 proc = self.Process(
575 target=self._test_get,
576 args=(queue, child_can_start, parent_can_continue)
577 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000578 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000579 proc.start()
580
581 self.assertEqual(queue_empty(queue), True)
582
583 child_can_start.set()
584 parent_can_continue.wait()
585
586 time.sleep(DELTA)
587 self.assertEqual(queue_empty(queue), False)
588
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000589 # Hangs unexpectedly, remove for now
590 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000591 self.assertEqual(queue.get(True, None), 2)
592 self.assertEqual(queue.get(True), 3)
593 self.assertEqual(queue.get(timeout=1), 4)
594 self.assertEqual(queue.get_nowait(), 5)
595
596 self.assertEqual(queue_empty(queue), True)
597
598 get = TimingWrapper(queue.get)
599 get_nowait = TimingWrapper(queue.get_nowait)
600
601 self.assertRaises(pyqueue.Empty, get, False)
602 self.assertTimingAlmostEqual(get.elapsed, 0)
603
604 self.assertRaises(pyqueue.Empty, get, False, None)
605 self.assertTimingAlmostEqual(get.elapsed, 0)
606
607 self.assertRaises(pyqueue.Empty, get_nowait)
608 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
609
610 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
611 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
612
613 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
614 self.assertTimingAlmostEqual(get.elapsed, 0)
615
616 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
617 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
618
619 proc.join()
620
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000621 @classmethod
622 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000623 for i in range(10, 20):
624 queue.put(i)
625 # note that at this point the items may only be buffered, so the
626 # process cannot shutdown until the feeder thread has finished
627 # pushing items onto the pipe.
628
629 def test_fork(self):
630 # Old versions of Queue would fail to create a new feeder
631 # thread for a forked process if the original process had its
632 # own feeder thread. This test checks that this no longer
633 # happens.
634
635 queue = self.Queue()
636
637 # put items on queue so that main process starts a feeder thread
638 for i in range(10):
639 queue.put(i)
640
641 # wait to make sure thread starts before we fork a new process
642 time.sleep(DELTA)
643
644 # fork process
645 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200646 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 p.start()
648
649 # check that all expected items are in the queue
650 for i in range(20):
651 self.assertEqual(queue.get(), i)
652 self.assertRaises(pyqueue.Empty, queue.get, False)
653
654 p.join()
655
656 def test_qsize(self):
657 q = self.Queue()
658 try:
659 self.assertEqual(q.qsize(), 0)
660 except NotImplementedError:
661 return
662 q.put(1)
663 self.assertEqual(q.qsize(), 1)
664 q.put(5)
665 self.assertEqual(q.qsize(), 2)
666 q.get()
667 self.assertEqual(q.qsize(), 1)
668 q.get()
669 self.assertEqual(q.qsize(), 0)
670
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000671 @classmethod
672 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000673 for obj in iter(q.get, None):
674 time.sleep(DELTA)
675 q.task_done()
676
677 def test_task_done(self):
678 queue = self.JoinableQueue()
679
680 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000681 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000682
683 workers = [self.Process(target=self._test_task_done, args=(queue,))
684 for i in range(4)]
685
686 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200687 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000688 p.start()
689
690 for i in range(10):
691 queue.put(i)
692
693 queue.join()
694
695 for p in workers:
696 queue.put(None)
697
698 for p in workers:
699 p.join()
700
Giampaolo Rodola'b38897f2013-04-17 13:08:59 +0200701 def test_timeout(self):
702 q = multiprocessing.Queue()
703 start = time.time()
704 self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
705 delta = time.time() - start
706 self.assertGreaterEqual(delta, 0.19)
707
Benjamin Petersone711caf2008-06-11 16:44:04 +0000708#
709#
710#
711
712class _TestLock(BaseTestCase):
713
714 def test_lock(self):
715 lock = self.Lock()
716 self.assertEqual(lock.acquire(), True)
717 self.assertEqual(lock.acquire(False), False)
718 self.assertEqual(lock.release(), None)
719 self.assertRaises((ValueError, threading.ThreadError), lock.release)
720
721 def test_rlock(self):
722 lock = self.RLock()
723 self.assertEqual(lock.acquire(), True)
724 self.assertEqual(lock.acquire(), True)
725 self.assertEqual(lock.acquire(), True)
726 self.assertEqual(lock.release(), None)
727 self.assertEqual(lock.release(), None)
728 self.assertEqual(lock.release(), None)
729 self.assertRaises((AssertionError, RuntimeError), lock.release)
730
Jesse Nollerf8d00852009-03-31 03:25:07 +0000731 def test_lock_context(self):
732 with self.Lock():
733 pass
734
Benjamin Petersone711caf2008-06-11 16:44:04 +0000735
736class _TestSemaphore(BaseTestCase):
737
738 def _test_semaphore(self, sem):
739 self.assertReturnsIfImplemented(2, get_value, sem)
740 self.assertEqual(sem.acquire(), True)
741 self.assertReturnsIfImplemented(1, get_value, sem)
742 self.assertEqual(sem.acquire(), True)
743 self.assertReturnsIfImplemented(0, get_value, sem)
744 self.assertEqual(sem.acquire(False), False)
745 self.assertReturnsIfImplemented(0, get_value, sem)
746 self.assertEqual(sem.release(), None)
747 self.assertReturnsIfImplemented(1, get_value, sem)
748 self.assertEqual(sem.release(), None)
749 self.assertReturnsIfImplemented(2, get_value, sem)
750
751 def test_semaphore(self):
752 sem = self.Semaphore(2)
753 self._test_semaphore(sem)
754 self.assertEqual(sem.release(), None)
755 self.assertReturnsIfImplemented(3, get_value, sem)
756 self.assertEqual(sem.release(), None)
757 self.assertReturnsIfImplemented(4, get_value, sem)
758
759 def test_bounded_semaphore(self):
760 sem = self.BoundedSemaphore(2)
761 self._test_semaphore(sem)
762 # Currently fails on OS/X
763 #if HAVE_GETVALUE:
764 # self.assertRaises(ValueError, sem.release)
765 # self.assertReturnsIfImplemented(2, get_value, sem)
766
767 def test_timeout(self):
768 if self.TYPE != 'processes':
769 return
770
771 sem = self.Semaphore(0)
772 acquire = TimingWrapper(sem.acquire)
773
774 self.assertEqual(acquire(False), False)
775 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
776
777 self.assertEqual(acquire(False, None), False)
778 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
779
780 self.assertEqual(acquire(False, TIMEOUT1), False)
781 self.assertTimingAlmostEqual(acquire.elapsed, 0)
782
783 self.assertEqual(acquire(True, TIMEOUT2), False)
784 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
785
786 self.assertEqual(acquire(timeout=TIMEOUT3), False)
787 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
788
789
790class _TestCondition(BaseTestCase):
791
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000792 @classmethod
793 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000794 cond.acquire()
795 sleeping.release()
796 cond.wait(timeout)
797 woken.release()
798 cond.release()
799
800 def check_invariant(self, cond):
801 # this is only supposed to succeed when there are no sleepers
802 if self.TYPE == 'processes':
803 try:
804 sleepers = (cond._sleeping_count.get_value() -
805 cond._woken_count.get_value())
806 self.assertEqual(sleepers, 0)
807 self.assertEqual(cond._wait_semaphore.get_value(), 0)
808 except NotImplementedError:
809 pass
810
811 def test_notify(self):
812 cond = self.Condition()
813 sleeping = self.Semaphore(0)
814 woken = self.Semaphore(0)
815
816 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000817 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000818 p.start()
819
820 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000821 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 p.start()
823
824 # wait for both children to start sleeping
825 sleeping.acquire()
826 sleeping.acquire()
827
828 # check no process/thread has woken up
829 time.sleep(DELTA)
830 self.assertReturnsIfImplemented(0, get_value, woken)
831
832 # wake up one process/thread
833 cond.acquire()
834 cond.notify()
835 cond.release()
836
837 # check one process/thread has woken up
838 time.sleep(DELTA)
839 self.assertReturnsIfImplemented(1, get_value, woken)
840
841 # wake up another
842 cond.acquire()
843 cond.notify()
844 cond.release()
845
846 # check other has woken up
847 time.sleep(DELTA)
848 self.assertReturnsIfImplemented(2, get_value, woken)
849
850 # check state is not mucked up
851 self.check_invariant(cond)
852 p.join()
853
854 def test_notify_all(self):
855 cond = self.Condition()
856 sleeping = self.Semaphore(0)
857 woken = self.Semaphore(0)
858
859 # start some threads/processes which will timeout
860 for i in range(3):
861 p = self.Process(target=self.f,
862 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000863 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000864 p.start()
865
866 t = threading.Thread(target=self.f,
867 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000868 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 t.start()
870
871 # wait for them all to sleep
872 for i in range(6):
873 sleeping.acquire()
874
875 # check they have all timed out
876 for i in range(6):
877 woken.acquire()
878 self.assertReturnsIfImplemented(0, get_value, woken)
879
880 # check state is not mucked up
881 self.check_invariant(cond)
882
883 # start some more threads/processes
884 for i in range(3):
885 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000886 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000887 p.start()
888
889 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000890 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891 t.start()
892
893 # wait for them to all sleep
894 for i in range(6):
895 sleeping.acquire()
896
897 # check no process/thread has woken up
898 time.sleep(DELTA)
899 self.assertReturnsIfImplemented(0, get_value, woken)
900
901 # wake them all up
902 cond.acquire()
903 cond.notify_all()
904 cond.release()
905
906 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200907 for i in range(10):
908 try:
909 if get_value(woken) == 6:
910 break
911 except NotImplementedError:
912 break
913 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000914 self.assertReturnsIfImplemented(6, get_value, woken)
915
916 # check state is not mucked up
917 self.check_invariant(cond)
918
919 def test_timeout(self):
920 cond = self.Condition()
921 wait = TimingWrapper(cond.wait)
922 cond.acquire()
923 res = wait(TIMEOUT1)
924 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000925 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000926 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
927
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200928 @classmethod
929 def _test_waitfor_f(cls, cond, state):
930 with cond:
931 state.value = 0
932 cond.notify()
933 result = cond.wait_for(lambda : state.value==4)
934 if not result or state.value != 4:
935 sys.exit(1)
936
937 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
938 def test_waitfor(self):
939 # based on test in test/lock_tests.py
940 cond = self.Condition()
941 state = self.Value('i', -1)
942
943 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
944 p.daemon = True
945 p.start()
946
947 with cond:
948 result = cond.wait_for(lambda : state.value==0)
949 self.assertTrue(result)
950 self.assertEqual(state.value, 0)
951
952 for i in range(4):
953 time.sleep(0.01)
954 with cond:
955 state.value += 1
956 cond.notify()
957
958 p.join(5)
959 self.assertFalse(p.is_alive())
960 self.assertEqual(p.exitcode, 0)
961
962 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100963 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
964 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200965 with cond:
966 expected = 0.1
967 dt = time.time()
968 result = cond.wait_for(lambda : state.value==4, timeout=expected)
969 dt = time.time() - dt
970 # borrow logic in assertTimeout() from test/lock_tests.py
971 if not result and expected * 0.6 < dt < expected * 10.0:
972 success.value = True
973
974 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
975 def test_waitfor_timeout(self):
976 # based on test in test/lock_tests.py
977 cond = self.Condition()
978 state = self.Value('i', 0)
979 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100980 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200981
982 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100983 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200984 p.daemon = True
985 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100986 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200987
988 # Only increment 3 times, so state == 4 is never reached.
989 for i in range(3):
990 time.sleep(0.01)
991 with cond:
992 state.value += 1
993 cond.notify()
994
995 p.join(5)
996 self.assertTrue(success.value)
997
Richard Oudkerk98449932012-06-05 13:15:29 +0100998 @classmethod
999 def _test_wait_result(cls, c, pid):
1000 with c:
1001 c.notify()
1002 time.sleep(1)
1003 if pid is not None:
1004 os.kill(pid, signal.SIGINT)
1005
1006 def test_wait_result(self):
1007 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1008 pid = os.getpid()
1009 else:
1010 pid = None
1011
1012 c = self.Condition()
1013 with c:
1014 self.assertFalse(c.wait(0))
1015 self.assertFalse(c.wait(0.1))
1016
1017 p = self.Process(target=self._test_wait_result, args=(c, pid))
1018 p.start()
1019
1020 self.assertTrue(c.wait(10))
1021 if pid is not None:
1022 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1023
1024 p.join()
1025
Benjamin Petersone711caf2008-06-11 16:44:04 +00001026
1027class _TestEvent(BaseTestCase):
1028
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001029 @classmethod
1030 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001031 time.sleep(TIMEOUT2)
1032 event.set()
1033
1034 def test_event(self):
1035 event = self.Event()
1036 wait = TimingWrapper(event.wait)
1037
Ezio Melotti13925002011-03-16 11:05:33 +02001038 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001040 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041
Benjamin Peterson965ce872009-04-05 21:24:58 +00001042 # Removed, threading.Event.wait() will return the value of the __flag
1043 # instead of None. API Shear with the semaphore backed mp.Event
1044 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001045 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001046 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1048
1049 event.set()
1050
1051 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001052 self.assertEqual(event.is_set(), True)
1053 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001054 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001055 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1057 # self.assertEqual(event.is_set(), True)
1058
1059 event.clear()
1060
1061 #self.assertEqual(event.is_set(), False)
1062
Jesus Cea94f964f2011-09-09 20:26:57 +02001063 p = self.Process(target=self._test_event, args=(event,))
1064 p.daemon = True
1065 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001066 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067
1068#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001069# Tests for Barrier - adapted from tests in test/lock_tests.py
1070#
1071
1072# Many of the tests for threading.Barrier use a list as an atomic
1073# counter: a value is appended to increment the counter, and the
1074# length of the list gives the value. We use the class DummyList
1075# for the same purpose.
1076
1077class _DummyList(object):
1078
1079 def __init__(self):
1080 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1081 lock = multiprocessing.Lock()
1082 self.__setstate__((wrapper, lock))
1083 self._lengthbuf[0] = 0
1084
1085 def __setstate__(self, state):
1086 (self._wrapper, self._lock) = state
1087 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1088
1089 def __getstate__(self):
1090 return (self._wrapper, self._lock)
1091
1092 def append(self, _):
1093 with self._lock:
1094 self._lengthbuf[0] += 1
1095
1096 def __len__(self):
1097 with self._lock:
1098 return self._lengthbuf[0]
1099
1100def _wait():
1101 # A crude wait/yield function not relying on synchronization primitives.
1102 time.sleep(0.01)
1103
1104
1105class Bunch(object):
1106 """
1107 A bunch of threads.
1108 """
1109 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1110 """
1111 Construct a bunch of `n` threads running the same function `f`.
1112 If `wait_before_exit` is True, the threads won't terminate until
1113 do_finish() is called.
1114 """
1115 self.f = f
1116 self.args = args
1117 self.n = n
1118 self.started = namespace.DummyList()
1119 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001120 self._can_exit = namespace.Event()
1121 if not wait_before_exit:
1122 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001123 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001124 p = namespace.Process(target=self.task)
1125 p.daemon = True
1126 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001127
1128 def task(self):
1129 pid = os.getpid()
1130 self.started.append(pid)
1131 try:
1132 self.f(*self.args)
1133 finally:
1134 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001135 self._can_exit.wait(30)
1136 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001137
1138 def wait_for_started(self):
1139 while len(self.started) < self.n:
1140 _wait()
1141
1142 def wait_for_finished(self):
1143 while len(self.finished) < self.n:
1144 _wait()
1145
1146 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001147 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001148
1149
1150class AppendTrue(object):
1151 def __init__(self, obj):
1152 self.obj = obj
1153 def __call__(self):
1154 self.obj.append(True)
1155
1156
1157class _TestBarrier(BaseTestCase):
1158 """
1159 Tests for Barrier objects.
1160 """
1161 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001162 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001163
1164 def setUp(self):
1165 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1166
1167 def tearDown(self):
1168 self.barrier.abort()
1169 self.barrier = None
1170
1171 def DummyList(self):
1172 if self.TYPE == 'threads':
1173 return []
1174 elif self.TYPE == 'manager':
1175 return self.manager.list()
1176 else:
1177 return _DummyList()
1178
1179 def run_threads(self, f, args):
1180 b = Bunch(self, f, args, self.N-1)
1181 f(*args)
1182 b.wait_for_finished()
1183
1184 @classmethod
1185 def multipass(cls, barrier, results, n):
1186 m = barrier.parties
1187 assert m == cls.N
1188 for i in range(n):
1189 results[0].append(True)
1190 assert len(results[1]) == i * m
1191 barrier.wait()
1192 results[1].append(True)
1193 assert len(results[0]) == (i + 1) * m
1194 barrier.wait()
1195 try:
1196 assert barrier.n_waiting == 0
1197 except NotImplementedError:
1198 pass
1199 assert not barrier.broken
1200
1201 def test_barrier(self, passes=1):
1202 """
1203 Test that a barrier is passed in lockstep
1204 """
1205 results = [self.DummyList(), self.DummyList()]
1206 self.run_threads(self.multipass, (self.barrier, results, passes))
1207
1208 def test_barrier_10(self):
1209 """
1210 Test that a barrier works for 10 consecutive runs
1211 """
1212 return self.test_barrier(10)
1213
1214 @classmethod
1215 def _test_wait_return_f(cls, barrier, queue):
1216 res = barrier.wait()
1217 queue.put(res)
1218
1219 def test_wait_return(self):
1220 """
1221 test the return value from barrier.wait
1222 """
1223 queue = self.Queue()
1224 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1225 results = [queue.get() for i in range(self.N)]
1226 self.assertEqual(results.count(0), 1)
1227
1228 @classmethod
1229 def _test_action_f(cls, barrier, results):
1230 barrier.wait()
1231 if len(results) != 1:
1232 raise RuntimeError
1233
1234 def test_action(self):
1235 """
1236 Test the 'action' callback
1237 """
1238 results = self.DummyList()
1239 barrier = self.Barrier(self.N, action=AppendTrue(results))
1240 self.run_threads(self._test_action_f, (barrier, results))
1241 self.assertEqual(len(results), 1)
1242
1243 @classmethod
1244 def _test_abort_f(cls, barrier, results1, results2):
1245 try:
1246 i = barrier.wait()
1247 if i == cls.N//2:
1248 raise RuntimeError
1249 barrier.wait()
1250 results1.append(True)
1251 except threading.BrokenBarrierError:
1252 results2.append(True)
1253 except RuntimeError:
1254 barrier.abort()
1255
1256 def test_abort(self):
1257 """
1258 Test that an abort will put the barrier in a broken state
1259 """
1260 results1 = self.DummyList()
1261 results2 = self.DummyList()
1262 self.run_threads(self._test_abort_f,
1263 (self.barrier, results1, results2))
1264 self.assertEqual(len(results1), 0)
1265 self.assertEqual(len(results2), self.N-1)
1266 self.assertTrue(self.barrier.broken)
1267
1268 @classmethod
1269 def _test_reset_f(cls, barrier, results1, results2, results3):
1270 i = barrier.wait()
1271 if i == cls.N//2:
1272 # Wait until the other threads are all in the barrier.
1273 while barrier.n_waiting < cls.N-1:
1274 time.sleep(0.001)
1275 barrier.reset()
1276 else:
1277 try:
1278 barrier.wait()
1279 results1.append(True)
1280 except threading.BrokenBarrierError:
1281 results2.append(True)
1282 # Now, pass the barrier again
1283 barrier.wait()
1284 results3.append(True)
1285
1286 def test_reset(self):
1287 """
1288 Test that a 'reset' on a barrier frees the waiting threads
1289 """
1290 results1 = self.DummyList()
1291 results2 = self.DummyList()
1292 results3 = self.DummyList()
1293 self.run_threads(self._test_reset_f,
1294 (self.barrier, results1, results2, results3))
1295 self.assertEqual(len(results1), 0)
1296 self.assertEqual(len(results2), self.N-1)
1297 self.assertEqual(len(results3), self.N)
1298
1299 @classmethod
1300 def _test_abort_and_reset_f(cls, barrier, barrier2,
1301 results1, results2, results3):
1302 try:
1303 i = barrier.wait()
1304 if i == cls.N//2:
1305 raise RuntimeError
1306 barrier.wait()
1307 results1.append(True)
1308 except threading.BrokenBarrierError:
1309 results2.append(True)
1310 except RuntimeError:
1311 barrier.abort()
1312 # Synchronize and reset the barrier. Must synchronize first so
1313 # that everyone has left it when we reset, and after so that no
1314 # one enters it before the reset.
1315 if barrier2.wait() == cls.N//2:
1316 barrier.reset()
1317 barrier2.wait()
1318 barrier.wait()
1319 results3.append(True)
1320
1321 def test_abort_and_reset(self):
1322 """
1323 Test that a barrier can be reset after being broken.
1324 """
1325 results1 = self.DummyList()
1326 results2 = self.DummyList()
1327 results3 = self.DummyList()
1328 barrier2 = self.Barrier(self.N)
1329
1330 self.run_threads(self._test_abort_and_reset_f,
1331 (self.barrier, barrier2, results1, results2, results3))
1332 self.assertEqual(len(results1), 0)
1333 self.assertEqual(len(results2), self.N-1)
1334 self.assertEqual(len(results3), self.N)
1335
1336 @classmethod
1337 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001338 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001339 if i == cls.N//2:
1340 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001341 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001342 try:
1343 barrier.wait(0.5)
1344 except threading.BrokenBarrierError:
1345 results.append(True)
1346
1347 def test_timeout(self):
1348 """
1349 Test wait(timeout)
1350 """
1351 results = self.DummyList()
1352 self.run_threads(self._test_timeout_f, (self.barrier, results))
1353 self.assertEqual(len(results), self.barrier.parties)
1354
1355 @classmethod
1356 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001357 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001358 if i == cls.N//2:
1359 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001360 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001361 try:
1362 barrier.wait()
1363 except threading.BrokenBarrierError:
1364 results.append(True)
1365
1366 def test_default_timeout(self):
1367 """
1368 Test the barrier's default timeout
1369 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001370 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001371 results = self.DummyList()
1372 self.run_threads(self._test_default_timeout_f, (barrier, results))
1373 self.assertEqual(len(results), barrier.parties)
1374
1375 def test_single_thread(self):
1376 b = self.Barrier(1)
1377 b.wait()
1378 b.wait()
1379
1380 @classmethod
1381 def _test_thousand_f(cls, barrier, passes, conn, lock):
1382 for i in range(passes):
1383 barrier.wait()
1384 with lock:
1385 conn.send(i)
1386
1387 def test_thousand(self):
1388 if self.TYPE == 'manager':
1389 return
1390 passes = 1000
1391 lock = self.Lock()
1392 conn, child_conn = self.Pipe(False)
1393 for j in range(self.N):
1394 p = self.Process(target=self._test_thousand_f,
1395 args=(self.barrier, passes, child_conn, lock))
1396 p.start()
1397
1398 for i in range(passes):
1399 for j in range(self.N):
1400 self.assertEqual(conn.recv(), i)
1401
1402#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001403#
1404#
1405
1406class _TestValue(BaseTestCase):
1407
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001408 ALLOWED_TYPES = ('processes',)
1409
Benjamin Petersone711caf2008-06-11 16:44:04 +00001410 codes_values = [
1411 ('i', 4343, 24234),
1412 ('d', 3.625, -4.25),
1413 ('h', -232, 234),
1414 ('c', latin('x'), latin('y'))
1415 ]
1416
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001417 def setUp(self):
1418 if not HAS_SHAREDCTYPES:
1419 self.skipTest("requires multiprocessing.sharedctypes")
1420
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001421 @classmethod
1422 def _test(cls, values):
1423 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001424 sv.value = cv[2]
1425
1426
1427 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001428 if raw:
1429 values = [self.RawValue(code, value)
1430 for code, value, _ in self.codes_values]
1431 else:
1432 values = [self.Value(code, value)
1433 for code, value, _ in self.codes_values]
1434
1435 for sv, cv in zip(values, self.codes_values):
1436 self.assertEqual(sv.value, cv[1])
1437
1438 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001439 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001440 proc.start()
1441 proc.join()
1442
1443 for sv, cv in zip(values, self.codes_values):
1444 self.assertEqual(sv.value, cv[2])
1445
1446 def test_rawvalue(self):
1447 self.test_value(raw=True)
1448
1449 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001450 val1 = self.Value('i', 5)
1451 lock1 = val1.get_lock()
1452 obj1 = val1.get_obj()
1453
1454 val2 = self.Value('i', 5, lock=None)
1455 lock2 = val2.get_lock()
1456 obj2 = val2.get_obj()
1457
1458 lock = self.Lock()
1459 val3 = self.Value('i', 5, lock=lock)
1460 lock3 = val3.get_lock()
1461 obj3 = val3.get_obj()
1462 self.assertEqual(lock, lock3)
1463
Jesse Nollerb0516a62009-01-18 03:11:38 +00001464 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001465 self.assertFalse(hasattr(arr4, 'get_lock'))
1466 self.assertFalse(hasattr(arr4, 'get_obj'))
1467
Jesse Nollerb0516a62009-01-18 03:11:38 +00001468 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1469
1470 arr5 = self.RawValue('i', 5)
1471 self.assertFalse(hasattr(arr5, 'get_lock'))
1472 self.assertFalse(hasattr(arr5, 'get_obj'))
1473
Benjamin Petersone711caf2008-06-11 16:44:04 +00001474
1475class _TestArray(BaseTestCase):
1476
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001477 ALLOWED_TYPES = ('processes',)
1478
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001479 @classmethod
1480 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001481 for i in range(1, len(seq)):
1482 seq[i] += seq[i-1]
1483
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001484 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001485 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001486 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1487 if raw:
1488 arr = self.RawArray('i', seq)
1489 else:
1490 arr = self.Array('i', seq)
1491
1492 self.assertEqual(len(arr), len(seq))
1493 self.assertEqual(arr[3], seq[3])
1494 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1495
1496 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1497
1498 self.assertEqual(list(arr[:]), seq)
1499
1500 self.f(seq)
1501
1502 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001503 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001504 p.start()
1505 p.join()
1506
1507 self.assertEqual(list(arr[:]), seq)
1508
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001509 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001510 def test_array_from_size(self):
1511 size = 10
1512 # Test for zeroing (see issue #11675).
1513 # The repetition below strengthens the test by increasing the chances
1514 # of previously allocated non-zero memory being used for the new array
1515 # on the 2nd and 3rd loops.
1516 for _ in range(3):
1517 arr = self.Array('i', size)
1518 self.assertEqual(len(arr), size)
1519 self.assertEqual(list(arr), [0] * size)
1520 arr[:] = range(10)
1521 self.assertEqual(list(arr), list(range(10)))
1522 del arr
1523
1524 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001525 def test_rawarray(self):
1526 self.test_array(raw=True)
1527
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001528 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001529 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001530 arr1 = self.Array('i', list(range(10)))
1531 lock1 = arr1.get_lock()
1532 obj1 = arr1.get_obj()
1533
1534 arr2 = self.Array('i', list(range(10)), lock=None)
1535 lock2 = arr2.get_lock()
1536 obj2 = arr2.get_obj()
1537
1538 lock = self.Lock()
1539 arr3 = self.Array('i', list(range(10)), lock=lock)
1540 lock3 = arr3.get_lock()
1541 obj3 = arr3.get_obj()
1542 self.assertEqual(lock, lock3)
1543
Jesse Nollerb0516a62009-01-18 03:11:38 +00001544 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001545 self.assertFalse(hasattr(arr4, 'get_lock'))
1546 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001547 self.assertRaises(AttributeError,
1548 self.Array, 'i', range(10), lock='notalock')
1549
1550 arr5 = self.RawArray('i', range(10))
1551 self.assertFalse(hasattr(arr5, 'get_lock'))
1552 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001553
1554#
1555#
1556#
1557
1558class _TestContainers(BaseTestCase):
1559
1560 ALLOWED_TYPES = ('manager',)
1561
1562 def test_list(self):
1563 a = self.list(list(range(10)))
1564 self.assertEqual(a[:], list(range(10)))
1565
1566 b = self.list()
1567 self.assertEqual(b[:], [])
1568
1569 b.extend(list(range(5)))
1570 self.assertEqual(b[:], list(range(5)))
1571
1572 self.assertEqual(b[2], 2)
1573 self.assertEqual(b[2:10], [2,3,4])
1574
1575 b *= 2
1576 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1577
1578 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1579
1580 self.assertEqual(a[:], list(range(10)))
1581
1582 d = [a, b]
1583 e = self.list(d)
1584 self.assertEqual(
1585 e[:],
1586 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1587 )
1588
1589 f = self.list([a])
1590 a.append('hello')
1591 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1592
1593 def test_dict(self):
1594 d = self.dict()
1595 indices = list(range(65, 70))
1596 for i in indices:
1597 d[i] = chr(i)
1598 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1599 self.assertEqual(sorted(d.keys()), indices)
1600 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1601 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1602
1603 def test_namespace(self):
1604 n = self.Namespace()
1605 n.name = 'Bob'
1606 n.job = 'Builder'
1607 n._hidden = 'hidden'
1608 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1609 del n.job
1610 self.assertEqual(str(n), "Namespace(name='Bob')")
1611 self.assertTrue(hasattr(n, 'name'))
1612 self.assertTrue(not hasattr(n, 'job'))
1613
1614#
1615#
1616#
1617
1618def sqr(x, wait=0.0):
1619 time.sleep(wait)
1620 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001621
Antoine Pitroude911b22011-12-21 11:03:24 +01001622def mul(x, y):
1623 return x*y
1624
Benjamin Petersone711caf2008-06-11 16:44:04 +00001625class _TestPool(BaseTestCase):
1626
1627 def test_apply(self):
1628 papply = self.pool.apply
1629 self.assertEqual(papply(sqr, (5,)), sqr(5))
1630 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1631
1632 def test_map(self):
1633 pmap = self.pool.map
1634 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1635 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1636 list(map(sqr, list(range(100)))))
1637
Antoine Pitroude911b22011-12-21 11:03:24 +01001638 def test_starmap(self):
1639 psmap = self.pool.starmap
1640 tuples = list(zip(range(10), range(9,-1, -1)))
1641 self.assertEqual(psmap(mul, tuples),
1642 list(itertools.starmap(mul, tuples)))
1643 tuples = list(zip(range(100), range(99,-1, -1)))
1644 self.assertEqual(psmap(mul, tuples, chunksize=20),
1645 list(itertools.starmap(mul, tuples)))
1646
1647 def test_starmap_async(self):
1648 tuples = list(zip(range(100), range(99,-1, -1)))
1649 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1650 list(itertools.starmap(mul, tuples)))
1651
Hynek Schlawack254af262012-10-27 12:53:02 +02001652 def test_map_async(self):
1653 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1654 list(map(sqr, list(range(10)))))
1655
1656 def test_map_async_callbacks(self):
1657 call_args = self.manager.list() if self.TYPE == 'manager' else []
1658 self.pool.map_async(int, ['1'],
1659 callback=call_args.append,
1660 error_callback=call_args.append).wait()
1661 self.assertEqual(1, len(call_args))
1662 self.assertEqual([1], call_args[0])
1663 self.pool.map_async(int, ['a'],
1664 callback=call_args.append,
1665 error_callback=call_args.append).wait()
1666 self.assertEqual(2, len(call_args))
1667 self.assertIsInstance(call_args[1], ValueError)
1668
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001669 def test_map_chunksize(self):
1670 try:
1671 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1672 except multiprocessing.TimeoutError:
1673 self.fail("pool.map_async with chunksize stalled on null list")
1674
Benjamin Petersone711caf2008-06-11 16:44:04 +00001675 def test_async(self):
1676 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1677 get = TimingWrapper(res.get)
1678 self.assertEqual(get(), 49)
1679 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1680
1681 def test_async_timeout(self):
1682 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1683 get = TimingWrapper(res.get)
1684 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1685 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1686
1687 def test_imap(self):
1688 it = self.pool.imap(sqr, list(range(10)))
1689 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1690
1691 it = self.pool.imap(sqr, list(range(10)))
1692 for i in range(10):
1693 self.assertEqual(next(it), i*i)
1694 self.assertRaises(StopIteration, it.__next__)
1695
1696 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1697 for i in range(1000):
1698 self.assertEqual(next(it), i*i)
1699 self.assertRaises(StopIteration, it.__next__)
1700
1701 def test_imap_unordered(self):
1702 it = self.pool.imap_unordered(sqr, list(range(1000)))
1703 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1704
1705 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1706 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1707
1708 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001709 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1710 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1711
Benjamin Petersone711caf2008-06-11 16:44:04 +00001712 p = multiprocessing.Pool(3)
1713 self.assertEqual(3, len(p._pool))
1714 p.close()
1715 p.join()
1716
1717 def test_terminate(self):
1718 if self.TYPE == 'manager':
1719 # On Unix a forked process increfs each shared object to
1720 # which its parent process held a reference. If the
1721 # forked process gets terminated then there is likely to
1722 # be a reference leak. So to prevent
1723 # _TestZZZNumberOfObjects from failing we skip this test
1724 # when using a manager.
1725 return
1726
1727 result = self.pool.map_async(
1728 time.sleep, [0.1 for i in range(10000)], chunksize=1
1729 )
1730 self.pool.terminate()
1731 join = TimingWrapper(self.pool.join)
1732 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001733 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001734
Richard Oudkerke41682b2012-06-06 19:04:57 +01001735 def test_empty_iterable(self):
1736 # See Issue 12157
1737 p = self.Pool(1)
1738
1739 self.assertEqual(p.map(sqr, []), [])
1740 self.assertEqual(list(p.imap(sqr, [])), [])
1741 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1742 self.assertEqual(p.map_async(sqr, []).get(), [])
1743
1744 p.close()
1745 p.join()
1746
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001747 def test_context(self):
1748 if self.TYPE == 'processes':
1749 L = list(range(10))
1750 expected = [sqr(i) for i in L]
1751 with multiprocessing.Pool(2) as p:
1752 r = p.map_async(sqr, L)
1753 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001754 print(p._state)
1755 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001756
Ask Solem2afcbf22010-11-09 20:55:52 +00001757def raising():
1758 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001759
Ask Solem2afcbf22010-11-09 20:55:52 +00001760def unpickleable_result():
1761 return lambda: 42
1762
1763class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001764 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001765
1766 def test_async_error_callback(self):
1767 p = multiprocessing.Pool(2)
1768
1769 scratchpad = [None]
1770 def errback(exc):
1771 scratchpad[0] = exc
1772
1773 res = p.apply_async(raising, error_callback=errback)
1774 self.assertRaises(KeyError, res.get)
1775 self.assertTrue(scratchpad[0])
1776 self.assertIsInstance(scratchpad[0], KeyError)
1777
1778 p.close()
1779 p.join()
1780
1781 def test_unpickleable_result(self):
1782 from multiprocessing.pool import MaybeEncodingError
1783 p = multiprocessing.Pool(2)
1784
1785 # Make sure we don't lose pool processes because of encoding errors.
1786 for iteration in range(20):
1787
1788 scratchpad = [None]
1789 def errback(exc):
1790 scratchpad[0] = exc
1791
1792 res = p.apply_async(unpickleable_result, error_callback=errback)
1793 self.assertRaises(MaybeEncodingError, res.get)
1794 wrapped = scratchpad[0]
1795 self.assertTrue(wrapped)
1796 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1797 self.assertIsNotNone(wrapped.exc)
1798 self.assertIsNotNone(wrapped.value)
1799
1800 p.close()
1801 p.join()
1802
1803class _TestPoolWorkerLifetime(BaseTestCase):
1804 ALLOWED_TYPES = ('processes', )
1805
Jesse Noller1f0b6582010-01-27 03:36:01 +00001806 def test_pool_worker_lifetime(self):
1807 p = multiprocessing.Pool(3, maxtasksperchild=10)
1808 self.assertEqual(3, len(p._pool))
1809 origworkerpids = [w.pid for w in p._pool]
1810 # Run many tasks so each worker gets replaced (hopefully)
1811 results = []
1812 for i in range(100):
1813 results.append(p.apply_async(sqr, (i, )))
1814 # Fetch the results and verify we got the right answers,
1815 # also ensuring all the tasks have completed.
1816 for (j, res) in enumerate(results):
1817 self.assertEqual(res.get(), sqr(j))
1818 # Refill the pool
1819 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001820 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001821 # (countdown * DELTA = 5 seconds max startup process time)
1822 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001823 while countdown and not all(w.is_alive() for w in p._pool):
1824 countdown -= 1
1825 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001826 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001827 # All pids should be assigned. See issue #7805.
1828 self.assertNotIn(None, origworkerpids)
1829 self.assertNotIn(None, finalworkerpids)
1830 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001831 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1832 p.close()
1833 p.join()
1834
Charles-François Natalif8859e12011-10-24 18:45:29 +02001835 def test_pool_worker_lifetime_early_close(self):
1836 # Issue #10332: closing a pool whose workers have limited lifetimes
1837 # before all the tasks completed would make join() hang.
1838 p = multiprocessing.Pool(3, maxtasksperchild=1)
1839 results = []
1840 for i in range(6):
1841 results.append(p.apply_async(sqr, (i, 0.3)))
1842 p.close()
1843 p.join()
1844 # check the results
1845 for (j, res) in enumerate(results):
1846 self.assertEqual(res.get(), sqr(j))
1847
1848
Benjamin Petersone711caf2008-06-11 16:44:04 +00001849#
1850# Test that manager has expected number of shared objects left
1851#
1852
1853class _TestZZZNumberOfObjects(BaseTestCase):
1854 # Because test cases are sorted alphabetically, this one will get
1855 # run after all the other tests for the manager. It tests that
1856 # there have been no "reference leaks" for the manager's shared
1857 # objects. Note the comment in _TestPool.test_terminate().
Richard Oudkerk3049f122012-06-15 20:08:29 +01001858
1859 # If some other test using ManagerMixin.manager fails, then the
1860 # raised exception may keep alive a frame which holds a reference
1861 # to a managed object. This will cause test_number_of_objects to
1862 # also fail.
Benjamin Petersone711caf2008-06-11 16:44:04 +00001863 ALLOWED_TYPES = ('manager',)
1864
1865 def test_number_of_objects(self):
1866 EXPECTED_NUMBER = 1 # the pool object is still alive
1867 multiprocessing.active_children() # discard dead process objs
1868 gc.collect() # do garbage collection
1869 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001870 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001871 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001872 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001873 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001874
1875 self.assertEqual(refs, EXPECTED_NUMBER)
1876
1877#
1878# Test of creating a customized manager class
1879#
1880
1881from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1882
1883class FooBar(object):
1884 def f(self):
1885 return 'f()'
1886 def g(self):
1887 raise ValueError
1888 def _h(self):
1889 return '_h()'
1890
1891def baz():
1892 for i in range(10):
1893 yield i*i
1894
1895class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001896 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001897 def __iter__(self):
1898 return self
1899 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001900 return self._callmethod('__next__')
1901
1902class MyManager(BaseManager):
1903 pass
1904
1905MyManager.register('Foo', callable=FooBar)
1906MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1907MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1908
1909
1910class _TestMyManager(BaseTestCase):
1911
1912 ALLOWED_TYPES = ('manager',)
1913
1914 def test_mymanager(self):
1915 manager = MyManager()
1916 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001917 self.common(manager)
1918 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001919
Richard Oudkerkac385712012-06-18 21:29:30 +01001920 # If the manager process exited cleanly then the exitcode
1921 # will be zero. Otherwise (after a short timeout)
1922 # terminate() is used, resulting in an exitcode of -SIGTERM.
1923 self.assertEqual(manager._process.exitcode, 0)
1924
1925 def test_mymanager_context(self):
1926 with MyManager() as manager:
1927 self.common(manager)
1928 self.assertEqual(manager._process.exitcode, 0)
1929
1930 def test_mymanager_context_prestarted(self):
1931 manager = MyManager()
1932 manager.start()
1933 with manager:
1934 self.common(manager)
1935 self.assertEqual(manager._process.exitcode, 0)
1936
1937 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001938 foo = manager.Foo()
1939 bar = manager.Bar()
1940 baz = manager.baz()
1941
1942 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1943 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1944
1945 self.assertEqual(foo_methods, ['f', 'g'])
1946 self.assertEqual(bar_methods, ['f', '_h'])
1947
1948 self.assertEqual(foo.f(), 'f()')
1949 self.assertRaises(ValueError, foo.g)
1950 self.assertEqual(foo._callmethod('f'), 'f()')
1951 self.assertRaises(RemoteError, foo._callmethod, '_h')
1952
1953 self.assertEqual(bar.f(), 'f()')
1954 self.assertEqual(bar._h(), '_h()')
1955 self.assertEqual(bar._callmethod('f'), 'f()')
1956 self.assertEqual(bar._callmethod('_h'), '_h()')
1957
1958 self.assertEqual(list(baz), [i*i for i in range(10)])
1959
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001960
Benjamin Petersone711caf2008-06-11 16:44:04 +00001961#
1962# Test of connecting to a remote server and using xmlrpclib for serialization
1963#
1964
1965_queue = pyqueue.Queue()
1966def get_queue():
1967 return _queue
1968
1969class QueueManager(BaseManager):
1970 '''manager class used by server process'''
1971QueueManager.register('get_queue', callable=get_queue)
1972
1973class QueueManager2(BaseManager):
1974 '''manager class which specifies the same interface as QueueManager'''
1975QueueManager2.register('get_queue')
1976
1977
1978SERIALIZER = 'xmlrpclib'
1979
1980class _TestRemoteManager(BaseTestCase):
1981
1982 ALLOWED_TYPES = ('manager',)
1983
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001984 @classmethod
1985 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001986 manager = QueueManager2(
1987 address=address, authkey=authkey, serializer=SERIALIZER
1988 )
1989 manager.connect()
1990 queue = manager.get_queue()
1991 queue.put(('hello world', None, True, 2.25))
1992
1993 def test_remote(self):
1994 authkey = os.urandom(32)
1995
1996 manager = QueueManager(
1997 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1998 )
1999 manager.start()
2000
2001 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002002 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002003 p.start()
2004
2005 manager2 = QueueManager2(
2006 address=manager.address, authkey=authkey, serializer=SERIALIZER
2007 )
2008 manager2.connect()
2009 queue = manager2.get_queue()
2010
2011 # Note that xmlrpclib will deserialize object as a list not a tuple
2012 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2013
2014 # Because we are using xmlrpclib for serialization instead of
2015 # pickle this will cause a serialization error.
2016 self.assertRaises(Exception, queue.put, time.sleep)
2017
2018 # Make queue finalizer run before the server is stopped
2019 del queue
2020 manager.shutdown()
2021
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002022class _TestManagerRestart(BaseTestCase):
2023
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002024 @classmethod
2025 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002026 manager = QueueManager(
2027 address=address, authkey=authkey, serializer=SERIALIZER)
2028 manager.connect()
2029 queue = manager.get_queue()
2030 queue.put('hello world')
2031
2032 def test_rapid_restart(self):
2033 authkey = os.urandom(32)
2034 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002035 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002036 srvr = manager.get_server()
2037 addr = srvr.address
2038 # Close the connection.Listener socket which gets opened as a part
2039 # of manager.get_server(). It's not needed for the test.
2040 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002041 manager.start()
2042
2043 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002044 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002045 p.start()
2046 queue = manager.get_queue()
2047 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002048 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002049 manager.shutdown()
2050 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002051 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002052 try:
2053 manager.start()
2054 except IOError as e:
2055 if e.errno != errno.EADDRINUSE:
2056 raise
2057 # Retry after some time, in case the old socket was lingering
2058 # (sporadic failure on buildbots)
2059 time.sleep(1.0)
2060 manager = QueueManager(
2061 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002062 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002063
Benjamin Petersone711caf2008-06-11 16:44:04 +00002064#
2065#
2066#
2067
2068SENTINEL = latin('')
2069
2070class _TestConnection(BaseTestCase):
2071
2072 ALLOWED_TYPES = ('processes', 'threads')
2073
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002074 @classmethod
2075 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002076 for msg in iter(conn.recv_bytes, SENTINEL):
2077 conn.send_bytes(msg)
2078 conn.close()
2079
2080 def test_connection(self):
2081 conn, child_conn = self.Pipe()
2082
2083 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002084 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002085 p.start()
2086
2087 seq = [1, 2.25, None]
2088 msg = latin('hello world')
2089 longmsg = msg * 10
2090 arr = array.array('i', list(range(4)))
2091
2092 if self.TYPE == 'processes':
2093 self.assertEqual(type(conn.fileno()), int)
2094
2095 self.assertEqual(conn.send(seq), None)
2096 self.assertEqual(conn.recv(), seq)
2097
2098 self.assertEqual(conn.send_bytes(msg), None)
2099 self.assertEqual(conn.recv_bytes(), msg)
2100
2101 if self.TYPE == 'processes':
2102 buffer = array.array('i', [0]*10)
2103 expected = list(arr) + [0] * (10 - len(arr))
2104 self.assertEqual(conn.send_bytes(arr), None)
2105 self.assertEqual(conn.recv_bytes_into(buffer),
2106 len(arr) * buffer.itemsize)
2107 self.assertEqual(list(buffer), expected)
2108
2109 buffer = array.array('i', [0]*10)
2110 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2111 self.assertEqual(conn.send_bytes(arr), None)
2112 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2113 len(arr) * buffer.itemsize)
2114 self.assertEqual(list(buffer), expected)
2115
2116 buffer = bytearray(latin(' ' * 40))
2117 self.assertEqual(conn.send_bytes(longmsg), None)
2118 try:
2119 res = conn.recv_bytes_into(buffer)
2120 except multiprocessing.BufferTooShort as e:
2121 self.assertEqual(e.args, (longmsg,))
2122 else:
2123 self.fail('expected BufferTooShort, got %s' % res)
2124
2125 poll = TimingWrapper(conn.poll)
2126
2127 self.assertEqual(poll(), False)
2128 self.assertTimingAlmostEqual(poll.elapsed, 0)
2129
Richard Oudkerk59d54042012-05-10 16:11:12 +01002130 self.assertEqual(poll(-1), False)
2131 self.assertTimingAlmostEqual(poll.elapsed, 0)
2132
Benjamin Petersone711caf2008-06-11 16:44:04 +00002133 self.assertEqual(poll(TIMEOUT1), False)
2134 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2135
2136 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002137 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002138
2139 self.assertEqual(poll(TIMEOUT1), True)
2140 self.assertTimingAlmostEqual(poll.elapsed, 0)
2141
2142 self.assertEqual(conn.recv(), None)
2143
2144 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2145 conn.send_bytes(really_big_msg)
2146 self.assertEqual(conn.recv_bytes(), really_big_msg)
2147
2148 conn.send_bytes(SENTINEL) # tell child to quit
2149 child_conn.close()
2150
2151 if self.TYPE == 'processes':
2152 self.assertEqual(conn.readable, True)
2153 self.assertEqual(conn.writable, True)
2154 self.assertRaises(EOFError, conn.recv)
2155 self.assertRaises(EOFError, conn.recv_bytes)
2156
2157 p.join()
2158
2159 def test_duplex_false(self):
2160 reader, writer = self.Pipe(duplex=False)
2161 self.assertEqual(writer.send(1), None)
2162 self.assertEqual(reader.recv(), 1)
2163 if self.TYPE == 'processes':
2164 self.assertEqual(reader.readable, True)
2165 self.assertEqual(reader.writable, False)
2166 self.assertEqual(writer.readable, False)
2167 self.assertEqual(writer.writable, True)
2168 self.assertRaises(IOError, reader.send, 2)
2169 self.assertRaises(IOError, writer.recv)
2170 self.assertRaises(IOError, writer.poll)
2171
2172 def test_spawn_close(self):
2173 # We test that a pipe connection can be closed by parent
2174 # process immediately after child is spawned. On Windows this
2175 # would have sometimes failed on old versions because
2176 # child_conn would be closed before the child got a chance to
2177 # duplicate it.
2178 conn, child_conn = self.Pipe()
2179
2180 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002181 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002182 p.start()
2183 child_conn.close() # this might complete before child initializes
2184
2185 msg = latin('hello')
2186 conn.send_bytes(msg)
2187 self.assertEqual(conn.recv_bytes(), msg)
2188
2189 conn.send_bytes(SENTINEL)
2190 conn.close()
2191 p.join()
2192
2193 def test_sendbytes(self):
2194 if self.TYPE != 'processes':
2195 return
2196
2197 msg = latin('abcdefghijklmnopqrstuvwxyz')
2198 a, b = self.Pipe()
2199
2200 a.send_bytes(msg)
2201 self.assertEqual(b.recv_bytes(), msg)
2202
2203 a.send_bytes(msg, 5)
2204 self.assertEqual(b.recv_bytes(), msg[5:])
2205
2206 a.send_bytes(msg, 7, 8)
2207 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2208
2209 a.send_bytes(msg, 26)
2210 self.assertEqual(b.recv_bytes(), latin(''))
2211
2212 a.send_bytes(msg, 26, 0)
2213 self.assertEqual(b.recv_bytes(), latin(''))
2214
2215 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2216
2217 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2218
2219 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2220
2221 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2222
2223 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2224
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002225 @classmethod
2226 def _is_fd_assigned(cls, fd):
2227 try:
2228 os.fstat(fd)
2229 except OSError as e:
2230 if e.errno == errno.EBADF:
2231 return False
2232 raise
2233 else:
2234 return True
2235
2236 @classmethod
2237 def _writefd(cls, conn, data, create_dummy_fds=False):
2238 if create_dummy_fds:
2239 for i in range(0, 256):
2240 if not cls._is_fd_assigned(i):
2241 os.dup2(conn.fileno(), i)
2242 fd = reduction.recv_handle(conn)
2243 if msvcrt:
2244 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2245 os.write(fd, data)
2246 os.close(fd)
2247
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002248 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002249 def test_fd_transfer(self):
2250 if self.TYPE != 'processes':
2251 self.skipTest("only makes sense with processes")
2252 conn, child_conn = self.Pipe(duplex=True)
2253
2254 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002255 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002256 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002257 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002258 with open(test.support.TESTFN, "wb") as f:
2259 fd = f.fileno()
2260 if msvcrt:
2261 fd = msvcrt.get_osfhandle(fd)
2262 reduction.send_handle(conn, fd, p.pid)
2263 p.join()
2264 with open(test.support.TESTFN, "rb") as f:
2265 self.assertEqual(f.read(), b"foo")
2266
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002267 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002268 @unittest.skipIf(sys.platform == "win32",
2269 "test semantics don't make sense on Windows")
2270 @unittest.skipIf(MAXFD <= 256,
2271 "largest assignable fd number is too small")
2272 @unittest.skipUnless(hasattr(os, "dup2"),
2273 "test needs os.dup2()")
2274 def test_large_fd_transfer(self):
2275 # With fd > 256 (issue #11657)
2276 if self.TYPE != 'processes':
2277 self.skipTest("only makes sense with processes")
2278 conn, child_conn = self.Pipe(duplex=True)
2279
2280 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002281 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002282 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002283 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002284 with open(test.support.TESTFN, "wb") as f:
2285 fd = f.fileno()
2286 for newfd in range(256, MAXFD):
2287 if not self._is_fd_assigned(newfd):
2288 break
2289 else:
2290 self.fail("could not find an unassigned large file descriptor")
2291 os.dup2(fd, newfd)
2292 try:
2293 reduction.send_handle(conn, newfd, p.pid)
2294 finally:
2295 os.close(newfd)
2296 p.join()
2297 with open(test.support.TESTFN, "rb") as f:
2298 self.assertEqual(f.read(), b"bar")
2299
Jesus Cea4507e642011-09-21 03:53:25 +02002300 @classmethod
2301 def _send_data_without_fd(self, conn):
2302 os.write(conn.fileno(), b"\0")
2303
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002304 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002305 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2306 def test_missing_fd_transfer(self):
2307 # Check that exception is raised when received data is not
2308 # accompanied by a file descriptor in ancillary data.
2309 if self.TYPE != 'processes':
2310 self.skipTest("only makes sense with processes")
2311 conn, child_conn = self.Pipe(duplex=True)
2312
2313 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2314 p.daemon = True
2315 p.start()
2316 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2317 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002318
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002319 def test_context(self):
2320 a, b = self.Pipe()
2321
2322 with a, b:
2323 a.send(1729)
2324 self.assertEqual(b.recv(), 1729)
2325 if self.TYPE == 'processes':
2326 self.assertFalse(a.closed)
2327 self.assertFalse(b.closed)
2328
2329 if self.TYPE == 'processes':
2330 self.assertTrue(a.closed)
2331 self.assertTrue(b.closed)
2332 self.assertRaises(IOError, a.recv)
2333 self.assertRaises(IOError, b.recv)
2334
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002335class _TestListener(BaseTestCase):
2336
Richard Oudkerk91257752012-06-15 21:53:34 +01002337 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002338
2339 def test_multiple_bind(self):
2340 for family in self.connection.families:
2341 l = self.connection.Listener(family=family)
2342 self.addCleanup(l.close)
2343 self.assertRaises(OSError, self.connection.Listener,
2344 l.address, family)
2345
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002346 def test_context(self):
2347 with self.connection.Listener() as l:
2348 with self.connection.Client(l.address) as c:
2349 with l.accept() as d:
2350 c.send(1729)
2351 self.assertEqual(d.recv(), 1729)
2352
2353 if self.TYPE == 'processes':
2354 self.assertRaises(IOError, l.accept)
2355
Benjamin Petersone711caf2008-06-11 16:44:04 +00002356class _TestListenerClient(BaseTestCase):
2357
2358 ALLOWED_TYPES = ('processes', 'threads')
2359
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002360 @classmethod
2361 def _test(cls, address):
2362 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002363 conn.send('hello')
2364 conn.close()
2365
2366 def test_listener_client(self):
2367 for family in self.connection.families:
2368 l = self.connection.Listener(family=family)
2369 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002370 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002371 p.start()
2372 conn = l.accept()
2373 self.assertEqual(conn.recv(), 'hello')
2374 p.join()
2375 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002376
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002377 def test_issue14725(self):
2378 l = self.connection.Listener()
2379 p = self.Process(target=self._test, args=(l.address,))
2380 p.daemon = True
2381 p.start()
2382 time.sleep(1)
2383 # On Windows the client process should by now have connected,
2384 # written data and closed the pipe handle by now. This causes
2385 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2386 # 14725.
2387 conn = l.accept()
2388 self.assertEqual(conn.recv(), 'hello')
2389 conn.close()
2390 p.join()
2391 l.close()
2392
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002393 def test_issue16955(self):
2394 for fam in self.connection.families:
2395 l = self.connection.Listener(family=fam)
2396 c = self.connection.Client(l.address)
2397 a = l.accept()
2398 a.send_bytes(b"hello")
2399 self.assertTrue(c.poll(1))
2400 a.close()
2401 c.close()
2402 l.close()
2403
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002404class _TestPoll(unittest.TestCase):
2405
2406 ALLOWED_TYPES = ('processes', 'threads')
2407
2408 def test_empty_string(self):
2409 a, b = self.Pipe()
2410 self.assertEqual(a.poll(), False)
2411 b.send_bytes(b'')
2412 self.assertEqual(a.poll(), True)
2413 self.assertEqual(a.poll(), True)
2414
2415 @classmethod
2416 def _child_strings(cls, conn, strings):
2417 for s in strings:
2418 time.sleep(0.1)
2419 conn.send_bytes(s)
2420 conn.close()
2421
2422 def test_strings(self):
2423 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2424 a, b = self.Pipe()
2425 p = self.Process(target=self._child_strings, args=(b, strings))
2426 p.start()
2427
2428 for s in strings:
2429 for i in range(200):
2430 if a.poll(0.01):
2431 break
2432 x = a.recv_bytes()
2433 self.assertEqual(s, x)
2434
2435 p.join()
2436
2437 @classmethod
2438 def _child_boundaries(cls, r):
2439 # Polling may "pull" a message in to the child process, but we
2440 # don't want it to pull only part of a message, as that would
2441 # corrupt the pipe for any other processes which might later
2442 # read from it.
2443 r.poll(5)
2444
2445 def test_boundaries(self):
2446 r, w = self.Pipe(False)
2447 p = self.Process(target=self._child_boundaries, args=(r,))
2448 p.start()
2449 time.sleep(2)
2450 L = [b"first", b"second"]
2451 for obj in L:
2452 w.send_bytes(obj)
2453 w.close()
2454 p.join()
2455 self.assertIn(r.recv_bytes(), L)
2456
2457 @classmethod
2458 def _child_dont_merge(cls, b):
2459 b.send_bytes(b'a')
2460 b.send_bytes(b'b')
2461 b.send_bytes(b'cd')
2462
2463 def test_dont_merge(self):
2464 a, b = self.Pipe()
2465 self.assertEqual(a.poll(0.0), False)
2466 self.assertEqual(a.poll(0.1), False)
2467
2468 p = self.Process(target=self._child_dont_merge, args=(b,))
2469 p.start()
2470
2471 self.assertEqual(a.recv_bytes(), b'a')
2472 self.assertEqual(a.poll(1.0), True)
2473 self.assertEqual(a.poll(1.0), True)
2474 self.assertEqual(a.recv_bytes(), b'b')
2475 self.assertEqual(a.poll(1.0), True)
2476 self.assertEqual(a.poll(1.0), True)
2477 self.assertEqual(a.poll(0.0), True)
2478 self.assertEqual(a.recv_bytes(), b'cd')
2479
2480 p.join()
2481
Benjamin Petersone711caf2008-06-11 16:44:04 +00002482#
2483# Test of sending connection and socket objects between processes
2484#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002485
2486@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002487class _TestPicklingConnections(BaseTestCase):
2488
2489 ALLOWED_TYPES = ('processes',)
2490
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002491 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002492 def tearDownClass(cls):
2493 from multiprocessing.reduction import resource_sharer
2494 resource_sharer.stop(timeout=5)
2495
2496 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002497 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002498 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002499 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002500 conn.send(l.address)
2501 new_conn = l.accept()
2502 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002503 new_conn.close()
2504 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002505
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002506 l = socket.socket()
2507 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002508 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002509 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002510 new_conn, addr = l.accept()
2511 conn.send(new_conn)
2512 new_conn.close()
2513 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002514
2515 conn.recv()
2516
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002517 @classmethod
2518 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002519 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002520 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002521 client.send(msg.upper())
2522 client.close()
2523
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002524 address, msg = conn.recv()
2525 client = socket.socket()
2526 client.connect(address)
2527 client.sendall(msg.upper())
2528 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002529
2530 conn.close()
2531
2532 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002533 families = self.connection.families
2534
2535 lconn, lconn0 = self.Pipe()
2536 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002537 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002538 lp.start()
2539 lconn0.close()
2540
2541 rconn, rconn0 = self.Pipe()
2542 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002543 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002544 rp.start()
2545 rconn0.close()
2546
2547 for fam in families:
2548 msg = ('This connection uses family %s' % fam).encode('ascii')
2549 address = lconn.recv()
2550 rconn.send((address, msg))
2551 new_conn = lconn.recv()
2552 self.assertEqual(new_conn.recv(), msg.upper())
2553
2554 rconn.send(None)
2555
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002556 msg = latin('This connection uses a normal socket')
2557 address = lconn.recv()
2558 rconn.send((address, msg))
2559 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002560 buf = []
2561 while True:
2562 s = new_conn.recv(100)
2563 if not s:
2564 break
2565 buf.append(s)
2566 buf = b''.join(buf)
2567 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002568 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002569
2570 lconn.send(None)
2571
2572 rconn.close()
2573 lconn.close()
2574
2575 lp.join()
2576 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002577
2578 @classmethod
2579 def child_access(cls, conn):
2580 w = conn.recv()
2581 w.send('all is well')
2582 w.close()
2583
2584 r = conn.recv()
2585 msg = r.recv()
2586 conn.send(msg*2)
2587
2588 conn.close()
2589
2590 def test_access(self):
2591 # On Windows, if we do not specify a destination pid when
2592 # using DupHandle then we need to be careful to use the
2593 # correct access flags for DuplicateHandle(), or else
2594 # DupHandle.detach() will raise PermissionError. For example,
2595 # for a read only pipe handle we should use
2596 # access=FILE_GENERIC_READ. (Unfortunately
2597 # DUPLICATE_SAME_ACCESS does not work.)
2598 conn, child_conn = self.Pipe()
2599 p = self.Process(target=self.child_access, args=(child_conn,))
2600 p.daemon = True
2601 p.start()
2602 child_conn.close()
2603
2604 r, w = self.Pipe(duplex=False)
2605 conn.send(w)
2606 w.close()
2607 self.assertEqual(r.recv(), 'all is well')
2608 r.close()
2609
2610 r, w = self.Pipe(duplex=False)
2611 conn.send(r)
2612 r.close()
2613 w.send('foobar')
2614 w.close()
2615 self.assertEqual(conn.recv(), 'foobar'*2)
2616
Benjamin Petersone711caf2008-06-11 16:44:04 +00002617#
2618#
2619#
2620
2621class _TestHeap(BaseTestCase):
2622
2623 ALLOWED_TYPES = ('processes',)
2624
2625 def test_heap(self):
2626 iterations = 5000
2627 maxblocks = 50
2628 blocks = []
2629
2630 # create and destroy lots of blocks of different sizes
2631 for i in range(iterations):
2632 size = int(random.lognormvariate(0, 1) * 1000)
2633 b = multiprocessing.heap.BufferWrapper(size)
2634 blocks.append(b)
2635 if len(blocks) > maxblocks:
2636 i = random.randrange(maxblocks)
2637 del blocks[i]
2638
2639 # get the heap object
2640 heap = multiprocessing.heap.BufferWrapper._heap
2641
2642 # verify the state of the heap
2643 all = []
2644 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002645 heap._lock.acquire()
2646 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002647 for L in list(heap._len_to_seq.values()):
2648 for arena, start, stop in L:
2649 all.append((heap._arenas.index(arena), start, stop,
2650 stop-start, 'free'))
2651 for arena, start, stop in heap._allocated_blocks:
2652 all.append((heap._arenas.index(arena), start, stop,
2653 stop-start, 'occupied'))
2654 occupied += (stop-start)
2655
2656 all.sort()
2657
2658 for i in range(len(all)-1):
2659 (arena, start, stop) = all[i][:3]
2660 (narena, nstart, nstop) = all[i+1][:3]
2661 self.assertTrue((arena != narena and nstart == 0) or
2662 (stop == nstart))
2663
Charles-François Natali778db492011-07-02 14:35:49 +02002664 def test_free_from_gc(self):
2665 # Check that freeing of blocks by the garbage collector doesn't deadlock
2666 # (issue #12352).
2667 # Make sure the GC is enabled, and set lower collection thresholds to
2668 # make collections more frequent (and increase the probability of
2669 # deadlock).
2670 if not gc.isenabled():
2671 gc.enable()
2672 self.addCleanup(gc.disable)
2673 thresholds = gc.get_threshold()
2674 self.addCleanup(gc.set_threshold, *thresholds)
2675 gc.set_threshold(10)
2676
2677 # perform numerous block allocations, with cyclic references to make
2678 # sure objects are collected asynchronously by the gc
2679 for i in range(5000):
2680 a = multiprocessing.heap.BufferWrapper(1)
2681 b = multiprocessing.heap.BufferWrapper(1)
2682 # circular references
2683 a.buddy = b
2684 b.buddy = a
2685
Benjamin Petersone711caf2008-06-11 16:44:04 +00002686#
2687#
2688#
2689
Benjamin Petersone711caf2008-06-11 16:44:04 +00002690class _Foo(Structure):
2691 _fields_ = [
2692 ('x', c_int),
2693 ('y', c_double)
2694 ]
2695
2696class _TestSharedCTypes(BaseTestCase):
2697
2698 ALLOWED_TYPES = ('processes',)
2699
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002700 def setUp(self):
2701 if not HAS_SHAREDCTYPES:
2702 self.skipTest("requires multiprocessing.sharedctypes")
2703
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002704 @classmethod
2705 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002706 x.value *= 2
2707 y.value *= 2
2708 foo.x *= 2
2709 foo.y *= 2
2710 string.value *= 2
2711 for i in range(len(arr)):
2712 arr[i] *= 2
2713
2714 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002715 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002716 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002717 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002718 arr = self.Array('d', list(range(10)), lock=lock)
2719 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002720 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002721
2722 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002723 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002724 p.start()
2725 p.join()
2726
2727 self.assertEqual(x.value, 14)
2728 self.assertAlmostEqual(y.value, 2.0/3.0)
2729 self.assertEqual(foo.x, 6)
2730 self.assertAlmostEqual(foo.y, 4.0)
2731 for i in range(10):
2732 self.assertAlmostEqual(arr[i], i*2)
2733 self.assertEqual(string.value, latin('hellohello'))
2734
2735 def test_synchronize(self):
2736 self.test_sharedctypes(lock=True)
2737
2738 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002739 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002740 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002741 foo.x = 0
2742 foo.y = 0
2743 self.assertEqual(bar.x, 2)
2744 self.assertAlmostEqual(bar.y, 5.0)
2745
2746#
2747#
2748#
2749
2750class _TestFinalize(BaseTestCase):
2751
2752 ALLOWED_TYPES = ('processes',)
2753
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002754 @classmethod
2755 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002756 class Foo(object):
2757 pass
2758
2759 a = Foo()
2760 util.Finalize(a, conn.send, args=('a',))
2761 del a # triggers callback for a
2762
2763 b = Foo()
2764 close_b = util.Finalize(b, conn.send, args=('b',))
2765 close_b() # triggers callback for b
2766 close_b() # does nothing because callback has already been called
2767 del b # does nothing because callback has already been called
2768
2769 c = Foo()
2770 util.Finalize(c, conn.send, args=('c',))
2771
2772 d10 = Foo()
2773 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2774
2775 d01 = Foo()
2776 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2777 d02 = Foo()
2778 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2779 d03 = Foo()
2780 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2781
2782 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2783
2784 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2785
Ezio Melotti13925002011-03-16 11:05:33 +02002786 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002787 # garbage collecting locals
2788 util._exit_function()
2789 conn.close()
2790 os._exit(0)
2791
2792 def test_finalize(self):
2793 conn, child_conn = self.Pipe()
2794
2795 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002796 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002797 p.start()
2798 p.join()
2799
2800 result = [obj for obj in iter(conn.recv, 'STOP')]
2801 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2802
2803#
2804# Test that from ... import * works for each module
2805#
2806
2807class _TestImportStar(BaseTestCase):
2808
2809 ALLOWED_TYPES = ('processes',)
2810
2811 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002812 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002813 'multiprocessing', 'multiprocessing.connection',
2814 'multiprocessing.heap', 'multiprocessing.managers',
2815 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002816 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002817 ]
2818
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002819 if HAS_REDUCTION:
2820 modules.append('multiprocessing.reduction')
2821
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002822 if c_int is not None:
2823 # This module requires _ctypes
2824 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002825
2826 for name in modules:
2827 __import__(name)
2828 mod = sys.modules[name]
2829
2830 for attr in getattr(mod, '__all__', ()):
2831 self.assertTrue(
2832 hasattr(mod, attr),
2833 '%r does not have attribute %r' % (mod, attr)
2834 )
2835
2836#
2837# Quick test that logging works -- does not test logging output
2838#
2839
2840class _TestLogging(BaseTestCase):
2841
2842 ALLOWED_TYPES = ('processes',)
2843
2844 def test_enable_logging(self):
2845 logger = multiprocessing.get_logger()
2846 logger.setLevel(util.SUBWARNING)
2847 self.assertTrue(logger is not None)
2848 logger.debug('this will not be printed')
2849 logger.info('nor will this')
2850 logger.setLevel(LOG_LEVEL)
2851
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002852 @classmethod
2853 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002854 logger = multiprocessing.get_logger()
2855 conn.send(logger.getEffectiveLevel())
2856
2857 def test_level(self):
2858 LEVEL1 = 32
2859 LEVEL2 = 37
2860
2861 logger = multiprocessing.get_logger()
2862 root_logger = logging.getLogger()
2863 root_level = root_logger.level
2864
2865 reader, writer = multiprocessing.Pipe(duplex=False)
2866
2867 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002868 p = self.Process(target=self._test_level, args=(writer,))
2869 p.daemon = True
2870 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002871 self.assertEqual(LEVEL1, reader.recv())
2872
2873 logger.setLevel(logging.NOTSET)
2874 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002875 p = self.Process(target=self._test_level, args=(writer,))
2876 p.daemon = True
2877 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002878 self.assertEqual(LEVEL2, reader.recv())
2879
2880 root_logger.setLevel(root_level)
2881 logger.setLevel(level=LOG_LEVEL)
2882
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002883
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002884# class _TestLoggingProcessName(BaseTestCase):
2885#
2886# def handle(self, record):
2887# assert record.processName == multiprocessing.current_process().name
2888# self.__handled = True
2889#
2890# def test_logging(self):
2891# handler = logging.Handler()
2892# handler.handle = self.handle
2893# self.__handled = False
2894# # Bypass getLogger() and side-effects
2895# logger = logging.getLoggerClass()(
2896# 'multiprocessing.test.TestLoggingProcessName')
2897# logger.addHandler(handler)
2898# logger.propagate = False
2899#
2900# logger.warn('foo')
2901# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002902
Benjamin Petersone711caf2008-06-11 16:44:04 +00002903#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002904# Check that Process.join() retries if os.waitpid() fails with EINTR
2905#
2906
2907class _TestPollEintr(BaseTestCase):
2908
2909 ALLOWED_TYPES = ('processes',)
2910
2911 @classmethod
2912 def _killer(cls, pid):
2913 time.sleep(0.5)
2914 os.kill(pid, signal.SIGUSR1)
2915
2916 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2917 def test_poll_eintr(self):
2918 got_signal = [False]
2919 def record(*args):
2920 got_signal[0] = True
2921 pid = os.getpid()
2922 oldhandler = signal.signal(signal.SIGUSR1, record)
2923 try:
2924 killer = self.Process(target=self._killer, args=(pid,))
2925 killer.start()
2926 p = self.Process(target=time.sleep, args=(1,))
2927 p.start()
2928 p.join()
2929 self.assertTrue(got_signal[0])
2930 self.assertEqual(p.exitcode, 0)
2931 killer.join()
2932 finally:
2933 signal.signal(signal.SIGUSR1, oldhandler)
2934
2935#
Jesse Noller6214edd2009-01-19 16:23:53 +00002936# Test to verify handle verification, see issue 3321
2937#
2938
2939class TestInvalidHandle(unittest.TestCase):
2940
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002941 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002942 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002943 conn = multiprocessing.connection.Connection(44977608)
2944 try:
2945 self.assertRaises((ValueError, IOError), conn.poll)
2946 finally:
2947 # Hack private attribute _handle to avoid printing an error
2948 # in conn.__del__
2949 conn._handle = None
2950 self.assertRaises((ValueError, IOError),
2951 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002952
Jesse Noller6214edd2009-01-19 16:23:53 +00002953#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002954# Functions used to create test cases from the base ones in this module
2955#
2956
2957def get_attributes(Source, names):
2958 d = {}
2959 for name in names:
2960 obj = getattr(Source, name)
2961 if type(obj) == type(get_attributes):
2962 obj = staticmethod(obj)
2963 d[name] = obj
2964 return d
2965
2966def create_test_cases(Mixin, type):
2967 result = {}
2968 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002969 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002970 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002971
2972 for name in list(glob.keys()):
2973 if name.startswith('_Test'):
2974 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002975 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002976 if type in base.ALLOWED_TYPES:
2977 newname = 'With' + Type + name[1:]
2978 class Temp(base, unittest.TestCase, Mixin):
2979 pass
2980 result[newname] = Temp
2981 Temp.__name__ = newname
2982 Temp.__module__ = Mixin.__module__
2983 return result
2984
2985#
2986# Create test cases
2987#
2988
2989class ProcessesMixin(object):
2990 TYPE = 'processes'
2991 Process = multiprocessing.Process
2992 locals().update(get_attributes(multiprocessing, (
2993 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01002994 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002995 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002996 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002997 )))
2998
2999testcases_processes = create_test_cases(ProcessesMixin, type='processes')
3000globals().update(testcases_processes)
3001
3002
3003class ManagerMixin(object):
3004 TYPE = 'manager'
3005 Process = multiprocessing.Process
3006 manager = object.__new__(multiprocessing.managers.SyncManager)
3007 locals().update(get_attributes(manager, (
3008 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01003009 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01003010 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00003011 )))
3012
3013testcases_manager = create_test_cases(ManagerMixin, type='manager')
3014globals().update(testcases_manager)
3015
3016
3017class ThreadsMixin(object):
3018 TYPE = 'threads'
3019 Process = multiprocessing.dummy.Process
3020 locals().update(get_attributes(multiprocessing.dummy, (
3021 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01003022 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00003023 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01003024 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00003025 )))
3026
3027testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3028globals().update(testcases_threads)
3029
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003030class OtherTest(unittest.TestCase):
3031 # TODO: add more tests for deliver/answer challenge.
3032 def test_deliver_challenge_auth_failure(self):
3033 class _FakeConnection(object):
3034 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003035 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003036 def send_bytes(self, data):
3037 pass
3038 self.assertRaises(multiprocessing.AuthenticationError,
3039 multiprocessing.connection.deliver_challenge,
3040 _FakeConnection(), b'abc')
3041
3042 def test_answer_challenge_auth_failure(self):
3043 class _FakeConnection(object):
3044 def __init__(self):
3045 self.count = 0
3046 def recv_bytes(self, size):
3047 self.count += 1
3048 if self.count == 1:
3049 return multiprocessing.connection.CHALLENGE
3050 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003051 return b'something bogus'
3052 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003053 def send_bytes(self, data):
3054 pass
3055 self.assertRaises(multiprocessing.AuthenticationError,
3056 multiprocessing.connection.answer_challenge,
3057 _FakeConnection(), b'abc')
3058
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003059#
3060# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3061#
3062
3063def initializer(ns):
3064 ns.test += 1
3065
3066class TestInitializers(unittest.TestCase):
3067 def setUp(self):
3068 self.mgr = multiprocessing.Manager()
3069 self.ns = self.mgr.Namespace()
3070 self.ns.test = 0
3071
3072 def tearDown(self):
3073 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003074 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003075
3076 def test_manager_initializer(self):
3077 m = multiprocessing.managers.SyncManager()
3078 self.assertRaises(TypeError, m.start, 1)
3079 m.start(initializer, (self.ns,))
3080 self.assertEqual(self.ns.test, 1)
3081 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003082 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003083
3084 def test_pool_initializer(self):
3085 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3086 p = multiprocessing.Pool(1, initializer, (self.ns,))
3087 p.close()
3088 p.join()
3089 self.assertEqual(self.ns.test, 1)
3090
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003091#
3092# Issue 5155, 5313, 5331: Test process in processes
3093# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3094#
3095
3096def _ThisSubProcess(q):
3097 try:
3098 item = q.get(block=False)
3099 except pyqueue.Empty:
3100 pass
3101
3102def _TestProcess(q):
3103 queue = multiprocessing.Queue()
3104 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003105 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003106 subProc.start()
3107 subProc.join()
3108
3109def _afunc(x):
3110 return x*x
3111
3112def pool_in_process():
3113 pool = multiprocessing.Pool(processes=4)
3114 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003115 pool.close()
3116 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003117
3118class _file_like(object):
3119 def __init__(self, delegate):
3120 self._delegate = delegate
3121 self._pid = None
3122
3123 @property
3124 def cache(self):
3125 pid = os.getpid()
3126 # There are no race conditions since fork keeps only the running thread
3127 if pid != self._pid:
3128 self._pid = pid
3129 self._cache = []
3130 return self._cache
3131
3132 def write(self, data):
3133 self.cache.append(data)
3134
3135 def flush(self):
3136 self._delegate.write(''.join(self.cache))
3137 self._cache = []
3138
3139class TestStdinBadfiledescriptor(unittest.TestCase):
3140
3141 def test_queue_in_process(self):
3142 queue = multiprocessing.Queue()
3143 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3144 proc.start()
3145 proc.join()
3146
3147 def test_pool_in_process(self):
3148 p = multiprocessing.Process(target=pool_in_process)
3149 p.start()
3150 p.join()
3151
3152 def test_flushing(self):
3153 sio = io.StringIO()
3154 flike = _file_like(sio)
3155 flike.write('foo')
3156 proc = multiprocessing.Process(target=lambda: flike.flush())
3157 flike.flush()
3158 assert sio.getvalue() == 'foo'
3159
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003160
3161class TestWait(unittest.TestCase):
3162
3163 @classmethod
3164 def _child_test_wait(cls, w, slow):
3165 for i in range(10):
3166 if slow:
3167 time.sleep(random.random()*0.1)
3168 w.send((i, os.getpid()))
3169 w.close()
3170
3171 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003172 from multiprocessing.connection import wait
3173 readers = []
3174 procs = []
3175 messages = []
3176
3177 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003178 r, w = multiprocessing.Pipe(duplex=False)
3179 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003180 p.daemon = True
3181 p.start()
3182 w.close()
3183 readers.append(r)
3184 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003185 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003186
3187 while readers:
3188 for r in wait(readers):
3189 try:
3190 msg = r.recv()
3191 except EOFError:
3192 readers.remove(r)
3193 r.close()
3194 else:
3195 messages.append(msg)
3196
3197 messages.sort()
3198 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3199 self.assertEqual(messages, expected)
3200
3201 @classmethod
3202 def _child_test_wait_socket(cls, address, slow):
3203 s = socket.socket()
3204 s.connect(address)
3205 for i in range(10):
3206 if slow:
3207 time.sleep(random.random()*0.1)
3208 s.sendall(('%s\n' % i).encode('ascii'))
3209 s.close()
3210
3211 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003212 from multiprocessing.connection import wait
3213 l = socket.socket()
3214 l.bind(('', 0))
3215 l.listen(4)
3216 addr = ('localhost', l.getsockname()[1])
3217 readers = []
3218 procs = []
3219 dic = {}
3220
3221 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003222 p = multiprocessing.Process(target=self._child_test_wait_socket,
3223 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003224 p.daemon = True
3225 p.start()
3226 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003227 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003228
3229 for i in range(4):
3230 r, _ = l.accept()
3231 readers.append(r)
3232 dic[r] = []
3233 l.close()
3234
3235 while readers:
3236 for r in wait(readers):
3237 msg = r.recv(32)
3238 if not msg:
3239 readers.remove(r)
3240 r.close()
3241 else:
3242 dic[r].append(msg)
3243
3244 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3245 for v in dic.values():
3246 self.assertEqual(b''.join(v), expected)
3247
3248 def test_wait_slow(self):
3249 self.test_wait(True)
3250
3251 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003252 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003253
3254 def test_wait_timeout(self):
3255 from multiprocessing.connection import wait
3256
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003257 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003258 a, b = multiprocessing.Pipe()
3259
3260 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003261 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003262 delta = time.time() - start
3263
3264 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003265 self.assertLess(delta, expected * 2)
3266 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003267
3268 b.send(None)
3269
3270 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003271 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003272 delta = time.time() - start
3273
3274 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003275 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003276
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003277 @classmethod
3278 def signal_and_sleep(cls, sem, period):
3279 sem.release()
3280 time.sleep(period)
3281
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003282 def test_wait_integer(self):
3283 from multiprocessing.connection import wait
3284
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003285 expected = 3
Giampaolo Rodola'67da8942013-01-14 02:24:25 +01003286 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003287 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003288 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003289 p = multiprocessing.Process(target=self.signal_and_sleep,
3290 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003291
3292 p.start()
3293 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003294 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003295
3296 start = time.time()
3297 res = wait([a, p.sentinel, b], expected + 20)
3298 delta = time.time() - start
3299
3300 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003301 self.assertLess(delta, expected + 2)
3302 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003303
3304 a.send(None)
3305
3306 start = time.time()
3307 res = wait([a, p.sentinel, b], 20)
3308 delta = time.time() - start
3309
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003310 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003311 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003312
3313 b.send(None)
3314
3315 start = time.time()
3316 res = wait([a, p.sentinel, b], 20)
3317 delta = time.time() - start
3318
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003319 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003320 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003321
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003322 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003323 p.join()
3324
Richard Oudkerk59d54042012-05-10 16:11:12 +01003325 def test_neg_timeout(self):
3326 from multiprocessing.connection import wait
3327 a, b = multiprocessing.Pipe()
3328 t = time.time()
3329 res = wait([a], timeout=-1)
3330 t = time.time() - t
3331 self.assertEqual(res, [])
3332 self.assertLess(t, 1)
3333 a.close()
3334 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003335
Antoine Pitrou709176f2012-04-01 17:19:09 +02003336#
3337# Issue 14151: Test invalid family on invalid environment
3338#
3339
3340class TestInvalidFamily(unittest.TestCase):
3341
3342 @unittest.skipIf(WIN32, "skipped on Windows")
3343 def test_invalid_family(self):
3344 with self.assertRaises(ValueError):
3345 multiprocessing.connection.Listener(r'\\.\test')
3346
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003347 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3348 def test_invalid_family_win32(self):
3349 with self.assertRaises(ValueError):
3350 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003351
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003352#
3353# Issue 12098: check sys.flags of child matches that for parent
3354#
3355
3356class TestFlags(unittest.TestCase):
3357 @classmethod
3358 def run_in_grandchild(cls, conn):
3359 conn.send(tuple(sys.flags))
3360
3361 @classmethod
3362 def run_in_child(cls):
3363 import json
3364 r, w = multiprocessing.Pipe(duplex=False)
3365 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3366 p.start()
3367 grandchild_flags = r.recv()
3368 p.join()
3369 r.close()
3370 w.close()
3371 flags = (tuple(sys.flags), grandchild_flags)
3372 print(json.dumps(flags))
3373
3374 def test_flags(self):
3375 import json, subprocess
3376 # start child process using unusual flags
3377 prog = ('from test.test_multiprocessing import TestFlags; ' +
3378 'TestFlags.run_in_child()')
3379 data = subprocess.check_output(
3380 [sys.executable, '-E', '-S', '-O', '-c', prog])
3381 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3382 self.assertEqual(child_flags, grandchild_flags)
3383
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003384#
3385# Test interaction with socket timeouts - see Issue #6056
3386#
3387
3388class TestTimeouts(unittest.TestCase):
3389 @classmethod
3390 def _test_timeout(cls, child, address):
3391 time.sleep(1)
3392 child.send(123)
3393 child.close()
3394 conn = multiprocessing.connection.Client(address)
3395 conn.send(456)
3396 conn.close()
3397
3398 def test_timeout(self):
3399 old_timeout = socket.getdefaulttimeout()
3400 try:
3401 socket.setdefaulttimeout(0.1)
3402 parent, child = multiprocessing.Pipe(duplex=True)
3403 l = multiprocessing.connection.Listener(family='AF_INET')
3404 p = multiprocessing.Process(target=self._test_timeout,
3405 args=(child, l.address))
3406 p.start()
3407 child.close()
3408 self.assertEqual(parent.recv(), 123)
3409 parent.close()
3410 conn = l.accept()
3411 self.assertEqual(conn.recv(), 456)
3412 conn.close()
3413 l.close()
3414 p.join(10)
3415 finally:
3416 socket.setdefaulttimeout(old_timeout)
3417
Richard Oudkerke88a2442012-08-14 11:41:32 +01003418#
3419# Test what happens with no "if __name__ == '__main__'"
3420#
3421
3422class TestNoForkBomb(unittest.TestCase):
3423 def test_noforkbomb(self):
3424 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3425 if WIN32:
3426 rc, out, err = test.script_helper.assert_python_failure(name)
3427 self.assertEqual('', out.decode('ascii'))
3428 self.assertIn('RuntimeError', err.decode('ascii'))
3429 else:
3430 rc, out, err = test.script_helper.assert_python_ok(name)
3431 self.assertEqual('123', out.decode('ascii').rstrip())
3432 self.assertEqual('', err.decode('ascii'))
3433
3434#
3435#
3436#
3437
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003438testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003439 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
Richard Oudkerk3165a752012-08-14 12:51:14 +01003440 TestFlags, TestTimeouts, TestNoForkBomb]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003441
Benjamin Petersone711caf2008-06-11 16:44:04 +00003442#
3443#
3444#
3445
3446def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003447 if sys.platform.startswith("linux"):
3448 try:
3449 lock = multiprocessing.RLock()
3450 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003451 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003452
Charles-François Natali221ef672011-11-22 18:55:22 +01003453 check_enough_semaphores()
3454
Benjamin Petersone711caf2008-06-11 16:44:04 +00003455 if run is None:
3456 from test.support import run_unittest as run
3457
3458 util.get_temp_dir() # creates temp directory for use by all processes
3459
3460 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3461
Benjamin Peterson41181742008-07-02 20:22:54 +00003462 ProcessesMixin.pool = multiprocessing.Pool(4)
3463 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
3464 ManagerMixin.manager.__init__()
3465 ManagerMixin.manager.start()
3466 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003467
3468 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003469 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3470 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003471 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3472 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003473 )
3474
3475 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3476 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003477 try:
3478 run(suite)
3479 finally:
3480 ThreadsMixin.pool.terminate()
3481 ProcessesMixin.pool.terminate()
3482 ManagerMixin.pool.terminate()
3483 ManagerMixin.pool.join()
3484 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003485 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003486 ThreadsMixin.pool.join()
3487 ProcessesMixin.pool.join()
3488 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00003489
3490def main():
3491 test_main(unittest.TextTestRunner(verbosity=2).run)
3492
3493if __name__ == '__main__':
3494 main()