blob: 07bfe2f98321065b40d511b4612920564ecd0696 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010022import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000023import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010024import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000031# import threading after _multiprocessing to raise a more revelant error
32# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.dummy
37import multiprocessing.connection
38import multiprocessing.managers
39import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
46 HAS_REDUCTION = True
47except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000065def latin(s):
66 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
69# Constants
70#
71
72LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000073#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
75DELTA = 0.1
76CHECK_TIMINGS = False # making true makes tests take a lot longer
77 # and can sometimes cause some non-serious
78 # failures because some calls block a bit
79 # longer than expected
80if CHECK_TIMINGS:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82else:
83 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84
85HAVE_GETVALUE = not getattr(_multiprocessing,
86 'HAVE_BROKEN_SEM_GETVALUE', False)
87
Jesse Noller6214edd2009-01-19 16:23:53 +000088WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020091
Richard Oudkerk59d54042012-05-10 16:11:12 +010092def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
Charles-François Natali221ef672011-11-22 18:55:22 +0100112
113def check_enough_semaphores():
114 """Check that the system supports enough semaphores to run the test."""
115 # minimum number of semaphores available according to POSIX
116 nsems_min = 256
117 try:
118 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119 except (AttributeError, ValueError):
120 # sysconf not available or setting not available
121 return
122 if nsems == -1 or nsems >= nsems_min:
123 return
124 raise unittest.SkipTest("The OS doesn't support enough semaphores "
125 "to run the test (required: %d)." % nsems_min)
126
127
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129# Creates a wrapper for a function which records the time it takes to finish
130#
131
132class TimingWrapper(object):
133
134 def __init__(self, func):
135 self.func = func
136 self.elapsed = None
137
138 def __call__(self, *args, **kwds):
139 t = time.time()
140 try:
141 return self.func(*args, **kwds)
142 finally:
143 self.elapsed = time.time() - t
144
145#
146# Base class for test cases
147#
148
149class BaseTestCase(object):
150
151 ALLOWED_TYPES = ('processes', 'manager', 'threads')
152
153 def assertTimingAlmostEqual(self, a, b):
154 if CHECK_TIMINGS:
155 self.assertAlmostEqual(a, b, 1)
156
157 def assertReturnsIfImplemented(self, value, func, *args):
158 try:
159 res = func(*args)
160 except NotImplementedError:
161 pass
162 else:
163 return self.assertEqual(value, res)
164
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000165 # For the sanity of Windows users, rather than crashing or freezing in
166 # multiple ways.
167 def __reduce__(self, *args):
168 raise NotImplementedError("shouldn't try to pickle a test case")
169
170 __reduce_ex__ = __reduce__
171
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172#
173# Return the value of a semaphore
174#
175
176def get_value(self):
177 try:
178 return self.get_value()
179 except AttributeError:
180 try:
181 return self._Semaphore__value
182 except AttributeError:
183 try:
184 return self._value
185 except AttributeError:
186 raise NotImplementedError
187
188#
189# Testcases
190#
191
192class _TestProcess(BaseTestCase):
193
194 ALLOWED_TYPES = ('processes', 'threads')
195
196 def test_current(self):
197 if self.TYPE == 'threads':
198 return
199
200 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000205 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(current.ident, os.getpid())
208 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000210 def test_daemon_argument(self):
211 if self.TYPE == "threads":
212 return
213
214 # By default uses the current process's daemon flag.
215 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000216 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 proc1 = self.Process(target=self._test, daemon=True)
218 self.assertTrue(proc1.daemon)
219 proc2 = self.Process(target=self._test, daemon=False)
220 self.assertFalse(proc2.daemon)
221
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000222 @classmethod
223 def _test(cls, q, *args, **kwds):
224 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 q.put(args)
226 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 q.put(current.pid)
231
232 def test_process(self):
233 q = self.Queue(1)
234 e = self.Event()
235 args = (q, 1, 2)
236 kwargs = {'hello':23, 'bye':2.54}
237 name = 'SomeProcess'
238 p = self.Process(
239 target=self._test, args=args, kwargs=kwargs, name=name
240 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 current = self.current_process()
243
244 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.authkey, current.authkey)
246 self.assertEqual(p.is_alive(), False)
247 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000250 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.start()
253
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(p.exitcode, None)
255 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000256 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(q.get(), args[1:])
259 self.assertEqual(q.get(), kwargs)
260 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(q.get(), current.authkey)
263 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.join()
266
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(p.exitcode, 0)
268 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 @classmethod
272 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 time.sleep(1000)
274
275 def test_terminate(self):
276 if self.TYPE == 'threads':
277 return
278
279 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.start()
282
283 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000285 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286
Richard Oudkerk59d54042012-05-10 16:11:12 +0100287 join = TimingWrapper(p.join)
288
289 self.assertEqual(join(0), None)
290 self.assertTimingAlmostEqual(join.elapsed, 0.0)
291 self.assertEqual(p.is_alive(), True)
292
293 self.assertEqual(join(-1), None)
294 self.assertTimingAlmostEqual(join.elapsed, 0.0)
295 self.assertEqual(p.is_alive(), True)
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 p.terminate()
298
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 self.assertEqual(join(), None)
300 self.assertTimingAlmostEqual(join.elapsed, 0.0)
301
302 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
305 p.join()
306
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000307 # XXX sometimes get p.exitcode == 0 on Windows ...
308 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309
310 def test_cpu_count(self):
311 try:
312 cpus = multiprocessing.cpu_count()
313 except NotImplementedError:
314 cpus = 1
315 self.assertTrue(type(cpus) is int)
316 self.assertTrue(cpus >= 1)
317
318 def test_active_children(self):
319 self.assertEqual(type(self.active_children()), list)
320
321 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000322 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323
Jesus Cea94f964f2011-09-09 20:26:57 +0200324 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000329 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000331 @classmethod
332 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 from multiprocessing import forking
334 wconn.send(id)
335 if len(id) < 2:
336 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000337 p = cls.Process(
338 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339 )
340 p.start()
341 p.join()
342
343 def test_recursion(self):
344 rconn, wconn = self.Pipe(duplex=False)
345 self._test_recursion(wconn, [])
346
347 time.sleep(DELTA)
348 result = []
349 while rconn.poll():
350 result.append(rconn.recv())
351
352 expected = [
353 [],
354 [0],
355 [0, 0],
356 [0, 1],
357 [1],
358 [1, 0],
359 [1, 1]
360 ]
361 self.assertEqual(result, expected)
362
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200363 @classmethod
364 def _test_sentinel(cls, event):
365 event.wait(10.0)
366
367 def test_sentinel(self):
368 if self.TYPE == "threads":
369 return
370 event = self.Event()
371 p = self.Process(target=self._test_sentinel, args=(event,))
372 with self.assertRaises(ValueError):
373 p.sentinel
374 p.start()
375 self.addCleanup(p.join)
376 sentinel = p.sentinel
377 self.assertIsInstance(sentinel, int)
378 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
379 event.set()
380 p.join()
381 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
382
Benjamin Petersone711caf2008-06-11 16:44:04 +0000383#
384#
385#
386
387class _UpperCaser(multiprocessing.Process):
388
389 def __init__(self):
390 multiprocessing.Process.__init__(self)
391 self.child_conn, self.parent_conn = multiprocessing.Pipe()
392
393 def run(self):
394 self.parent_conn.close()
395 for s in iter(self.child_conn.recv, None):
396 self.child_conn.send(s.upper())
397 self.child_conn.close()
398
399 def submit(self, s):
400 assert type(s) is str
401 self.parent_conn.send(s)
402 return self.parent_conn.recv()
403
404 def stop(self):
405 self.parent_conn.send(None)
406 self.parent_conn.close()
407 self.child_conn.close()
408
409class _TestSubclassingProcess(BaseTestCase):
410
411 ALLOWED_TYPES = ('processes',)
412
413 def test_subclassing(self):
414 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200415 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000416 uppercaser.start()
417 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
418 self.assertEqual(uppercaser.submit('world'), 'WORLD')
419 uppercaser.stop()
420 uppercaser.join()
421
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100422 def test_stderr_flush(self):
423 # sys.stderr is flushed at process shutdown (issue #13812)
424 if self.TYPE == "threads":
425 return
426
427 testfn = test.support.TESTFN
428 self.addCleanup(test.support.unlink, testfn)
429 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
430 proc.start()
431 proc.join()
432 with open(testfn, 'r') as f:
433 err = f.read()
434 # The whole traceback was printed
435 self.assertIn("ZeroDivisionError", err)
436 self.assertIn("test_multiprocessing.py", err)
437 self.assertIn("1/0 # MARKER", err)
438
439 @classmethod
440 def _test_stderr_flush(cls, testfn):
441 sys.stderr = open(testfn, 'w')
442 1/0 # MARKER
443
444
Richard Oudkerk29471de2012-06-06 19:04:57 +0100445 @classmethod
446 def _test_sys_exit(cls, reason, testfn):
447 sys.stderr = open(testfn, 'w')
448 sys.exit(reason)
449
450 def test_sys_exit(self):
451 # See Issue 13854
452 if self.TYPE == 'threads':
453 return
454
455 testfn = test.support.TESTFN
456 self.addCleanup(test.support.unlink, testfn)
457
458 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
459 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
460 p.daemon = True
461 p.start()
462 p.join(5)
463 self.assertEqual(p.exitcode, code)
464
465 with open(testfn, 'r') as f:
466 self.assertEqual(f.read().rstrip(), str(reason))
467
468 for reason in (True, False, 8):
469 p = self.Process(target=sys.exit, args=(reason,))
470 p.daemon = True
471 p.start()
472 p.join(5)
473 self.assertEqual(p.exitcode, reason)
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475#
476#
477#
478
479def queue_empty(q):
480 if hasattr(q, 'empty'):
481 return q.empty()
482 else:
483 return q.qsize() == 0
484
485def queue_full(q, maxsize):
486 if hasattr(q, 'full'):
487 return q.full()
488 else:
489 return q.qsize() == maxsize
490
491
492class _TestQueue(BaseTestCase):
493
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 child_can_start.wait()
498 for i in range(6):
499 queue.get()
500 parent_can_continue.set()
501
502 def test_put(self):
503 MAXSIZE = 6
504 queue = self.Queue(maxsize=MAXSIZE)
505 child_can_start = self.Event()
506 parent_can_continue = self.Event()
507
508 proc = self.Process(
509 target=self._test_put,
510 args=(queue, child_can_start, parent_can_continue)
511 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000512 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 proc.start()
514
515 self.assertEqual(queue_empty(queue), True)
516 self.assertEqual(queue_full(queue, MAXSIZE), False)
517
518 queue.put(1)
519 queue.put(2, True)
520 queue.put(3, True, None)
521 queue.put(4, False)
522 queue.put(5, False, None)
523 queue.put_nowait(6)
524
525 # the values may be in buffer but not yet in pipe so sleep a bit
526 time.sleep(DELTA)
527
528 self.assertEqual(queue_empty(queue), False)
529 self.assertEqual(queue_full(queue, MAXSIZE), True)
530
531 put = TimingWrapper(queue.put)
532 put_nowait = TimingWrapper(queue.put_nowait)
533
534 self.assertRaises(pyqueue.Full, put, 7, False)
535 self.assertTimingAlmostEqual(put.elapsed, 0)
536
537 self.assertRaises(pyqueue.Full, put, 7, False, None)
538 self.assertTimingAlmostEqual(put.elapsed, 0)
539
540 self.assertRaises(pyqueue.Full, put_nowait, 7)
541 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
542
543 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
544 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
545
546 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
547 self.assertTimingAlmostEqual(put.elapsed, 0)
548
549 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
550 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 self.assertEqual(queue_empty(queue), True)
556 self.assertEqual(queue_full(queue, MAXSIZE), False)
557
558 proc.join()
559
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000560 @classmethod
561 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000563 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000564 queue.put(2)
565 queue.put(3)
566 queue.put(4)
567 queue.put(5)
568 parent_can_continue.set()
569
570 def test_get(self):
571 queue = self.Queue()
572 child_can_start = self.Event()
573 parent_can_continue = self.Event()
574
575 proc = self.Process(
576 target=self._test_get,
577 args=(queue, child_can_start, parent_can_continue)
578 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000579 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 proc.start()
581
582 self.assertEqual(queue_empty(queue), True)
583
584 child_can_start.set()
585 parent_can_continue.wait()
586
587 time.sleep(DELTA)
588 self.assertEqual(queue_empty(queue), False)
589
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000590 # Hangs unexpectedly, remove for now
591 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 self.assertEqual(queue.get(True, None), 2)
593 self.assertEqual(queue.get(True), 3)
594 self.assertEqual(queue.get(timeout=1), 4)
595 self.assertEqual(queue.get_nowait(), 5)
596
597 self.assertEqual(queue_empty(queue), True)
598
599 get = TimingWrapper(queue.get)
600 get_nowait = TimingWrapper(queue.get_nowait)
601
602 self.assertRaises(pyqueue.Empty, get, False)
603 self.assertTimingAlmostEqual(get.elapsed, 0)
604
605 self.assertRaises(pyqueue.Empty, get, False, None)
606 self.assertTimingAlmostEqual(get.elapsed, 0)
607
608 self.assertRaises(pyqueue.Empty, get_nowait)
609 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
610
611 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
612 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
613
614 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
615 self.assertTimingAlmostEqual(get.elapsed, 0)
616
617 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
618 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
619
620 proc.join()
621
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000622 @classmethod
623 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000624 for i in range(10, 20):
625 queue.put(i)
626 # note that at this point the items may only be buffered, so the
627 # process cannot shutdown until the feeder thread has finished
628 # pushing items onto the pipe.
629
630 def test_fork(self):
631 # Old versions of Queue would fail to create a new feeder
632 # thread for a forked process if the original process had its
633 # own feeder thread. This test checks that this no longer
634 # happens.
635
636 queue = self.Queue()
637
638 # put items on queue so that main process starts a feeder thread
639 for i in range(10):
640 queue.put(i)
641
642 # wait to make sure thread starts before we fork a new process
643 time.sleep(DELTA)
644
645 # fork process
646 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200647 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000648 p.start()
649
650 # check that all expected items are in the queue
651 for i in range(20):
652 self.assertEqual(queue.get(), i)
653 self.assertRaises(pyqueue.Empty, queue.get, False)
654
655 p.join()
656
657 def test_qsize(self):
658 q = self.Queue()
659 try:
660 self.assertEqual(q.qsize(), 0)
661 except NotImplementedError:
662 return
663 q.put(1)
664 self.assertEqual(q.qsize(), 1)
665 q.put(5)
666 self.assertEqual(q.qsize(), 2)
667 q.get()
668 self.assertEqual(q.qsize(), 1)
669 q.get()
670 self.assertEqual(q.qsize(), 0)
671
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000672 @classmethod
673 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 for obj in iter(q.get, None):
675 time.sleep(DELTA)
676 q.task_done()
677
678 def test_task_done(self):
679 queue = self.JoinableQueue()
680
681 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000682 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683
684 workers = [self.Process(target=self._test_task_done, args=(queue,))
685 for i in range(4)]
686
687 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200688 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 p.start()
690
691 for i in range(10):
692 queue.put(i)
693
694 queue.join()
695
696 for p in workers:
697 queue.put(None)
698
699 for p in workers:
700 p.join()
701
702#
703#
704#
705
706class _TestLock(BaseTestCase):
707
708 def test_lock(self):
709 lock = self.Lock()
710 self.assertEqual(lock.acquire(), True)
711 self.assertEqual(lock.acquire(False), False)
712 self.assertEqual(lock.release(), None)
713 self.assertRaises((ValueError, threading.ThreadError), lock.release)
714
715 def test_rlock(self):
716 lock = self.RLock()
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.acquire(), True)
719 self.assertEqual(lock.acquire(), True)
720 self.assertEqual(lock.release(), None)
721 self.assertEqual(lock.release(), None)
722 self.assertEqual(lock.release(), None)
723 self.assertRaises((AssertionError, RuntimeError), lock.release)
724
Jesse Nollerf8d00852009-03-31 03:25:07 +0000725 def test_lock_context(self):
726 with self.Lock():
727 pass
728
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729
730class _TestSemaphore(BaseTestCase):
731
732 def _test_semaphore(self, sem):
733 self.assertReturnsIfImplemented(2, get_value, sem)
734 self.assertEqual(sem.acquire(), True)
735 self.assertReturnsIfImplemented(1, get_value, sem)
736 self.assertEqual(sem.acquire(), True)
737 self.assertReturnsIfImplemented(0, get_value, sem)
738 self.assertEqual(sem.acquire(False), False)
739 self.assertReturnsIfImplemented(0, get_value, sem)
740 self.assertEqual(sem.release(), None)
741 self.assertReturnsIfImplemented(1, get_value, sem)
742 self.assertEqual(sem.release(), None)
743 self.assertReturnsIfImplemented(2, get_value, sem)
744
745 def test_semaphore(self):
746 sem = self.Semaphore(2)
747 self._test_semaphore(sem)
748 self.assertEqual(sem.release(), None)
749 self.assertReturnsIfImplemented(3, get_value, sem)
750 self.assertEqual(sem.release(), None)
751 self.assertReturnsIfImplemented(4, get_value, sem)
752
753 def test_bounded_semaphore(self):
754 sem = self.BoundedSemaphore(2)
755 self._test_semaphore(sem)
756 # Currently fails on OS/X
757 #if HAVE_GETVALUE:
758 # self.assertRaises(ValueError, sem.release)
759 # self.assertReturnsIfImplemented(2, get_value, sem)
760
761 def test_timeout(self):
762 if self.TYPE != 'processes':
763 return
764
765 sem = self.Semaphore(0)
766 acquire = TimingWrapper(sem.acquire)
767
768 self.assertEqual(acquire(False), False)
769 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
770
771 self.assertEqual(acquire(False, None), False)
772 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
773
774 self.assertEqual(acquire(False, TIMEOUT1), False)
775 self.assertTimingAlmostEqual(acquire.elapsed, 0)
776
777 self.assertEqual(acquire(True, TIMEOUT2), False)
778 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
779
780 self.assertEqual(acquire(timeout=TIMEOUT3), False)
781 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
782
783
784class _TestCondition(BaseTestCase):
785
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000786 @classmethod
787 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 cond.acquire()
789 sleeping.release()
790 cond.wait(timeout)
791 woken.release()
792 cond.release()
793
794 def check_invariant(self, cond):
795 # this is only supposed to succeed when there are no sleepers
796 if self.TYPE == 'processes':
797 try:
798 sleepers = (cond._sleeping_count.get_value() -
799 cond._woken_count.get_value())
800 self.assertEqual(sleepers, 0)
801 self.assertEqual(cond._wait_semaphore.get_value(), 0)
802 except NotImplementedError:
803 pass
804
805 def test_notify(self):
806 cond = self.Condition()
807 sleeping = self.Semaphore(0)
808 woken = self.Semaphore(0)
809
810 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000811 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 p.start()
813
814 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000815 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 p.start()
817
818 # wait for both children to start sleeping
819 sleeping.acquire()
820 sleeping.acquire()
821
822 # check no process/thread has woken up
823 time.sleep(DELTA)
824 self.assertReturnsIfImplemented(0, get_value, woken)
825
826 # wake up one process/thread
827 cond.acquire()
828 cond.notify()
829 cond.release()
830
831 # check one process/thread has woken up
832 time.sleep(DELTA)
833 self.assertReturnsIfImplemented(1, get_value, woken)
834
835 # wake up another
836 cond.acquire()
837 cond.notify()
838 cond.release()
839
840 # check other has woken up
841 time.sleep(DELTA)
842 self.assertReturnsIfImplemented(2, get_value, woken)
843
844 # check state is not mucked up
845 self.check_invariant(cond)
846 p.join()
847
848 def test_notify_all(self):
849 cond = self.Condition()
850 sleeping = self.Semaphore(0)
851 woken = self.Semaphore(0)
852
853 # start some threads/processes which will timeout
854 for i in range(3):
855 p = self.Process(target=self.f,
856 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000857 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 p.start()
859
860 t = threading.Thread(target=self.f,
861 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000862 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 t.start()
864
865 # wait for them all to sleep
866 for i in range(6):
867 sleeping.acquire()
868
869 # check they have all timed out
870 for i in range(6):
871 woken.acquire()
872 self.assertReturnsIfImplemented(0, get_value, woken)
873
874 # check state is not mucked up
875 self.check_invariant(cond)
876
877 # start some more threads/processes
878 for i in range(3):
879 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000880 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 p.start()
882
883 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000884 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885 t.start()
886
887 # wait for them to all sleep
888 for i in range(6):
889 sleeping.acquire()
890
891 # check no process/thread has woken up
892 time.sleep(DELTA)
893 self.assertReturnsIfImplemented(0, get_value, woken)
894
895 # wake them all up
896 cond.acquire()
897 cond.notify_all()
898 cond.release()
899
900 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200901 for i in range(10):
902 try:
903 if get_value(woken) == 6:
904 break
905 except NotImplementedError:
906 break
907 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 self.assertReturnsIfImplemented(6, get_value, woken)
909
910 # check state is not mucked up
911 self.check_invariant(cond)
912
913 def test_timeout(self):
914 cond = self.Condition()
915 wait = TimingWrapper(cond.wait)
916 cond.acquire()
917 res = wait(TIMEOUT1)
918 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000919 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
921
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200922 @classmethod
923 def _test_waitfor_f(cls, cond, state):
924 with cond:
925 state.value = 0
926 cond.notify()
927 result = cond.wait_for(lambda : state.value==4)
928 if not result or state.value != 4:
929 sys.exit(1)
930
931 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
932 def test_waitfor(self):
933 # based on test in test/lock_tests.py
934 cond = self.Condition()
935 state = self.Value('i', -1)
936
937 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
938 p.daemon = True
939 p.start()
940
941 with cond:
942 result = cond.wait_for(lambda : state.value==0)
943 self.assertTrue(result)
944 self.assertEqual(state.value, 0)
945
946 for i in range(4):
947 time.sleep(0.01)
948 with cond:
949 state.value += 1
950 cond.notify()
951
952 p.join(5)
953 self.assertFalse(p.is_alive())
954 self.assertEqual(p.exitcode, 0)
955
956 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100957 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
958 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200959 with cond:
960 expected = 0.1
961 dt = time.time()
962 result = cond.wait_for(lambda : state.value==4, timeout=expected)
963 dt = time.time() - dt
964 # borrow logic in assertTimeout() from test/lock_tests.py
965 if not result and expected * 0.6 < dt < expected * 10.0:
966 success.value = True
967
968 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
969 def test_waitfor_timeout(self):
970 # based on test in test/lock_tests.py
971 cond = self.Condition()
972 state = self.Value('i', 0)
973 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100974 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975
976 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100977 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200978 p.daemon = True
979 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100980 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200981
982 # Only increment 3 times, so state == 4 is never reached.
983 for i in range(3):
984 time.sleep(0.01)
985 with cond:
986 state.value += 1
987 cond.notify()
988
989 p.join(5)
990 self.assertTrue(success.value)
991
Richard Oudkerk98449932012-06-05 13:15:29 +0100992 @classmethod
993 def _test_wait_result(cls, c, pid):
994 with c:
995 c.notify()
996 time.sleep(1)
997 if pid is not None:
998 os.kill(pid, signal.SIGINT)
999
1000 def test_wait_result(self):
1001 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1002 pid = os.getpid()
1003 else:
1004 pid = None
1005
1006 c = self.Condition()
1007 with c:
1008 self.assertFalse(c.wait(0))
1009 self.assertFalse(c.wait(0.1))
1010
1011 p = self.Process(target=self._test_wait_result, args=(c, pid))
1012 p.start()
1013
1014 self.assertTrue(c.wait(10))
1015 if pid is not None:
1016 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1017
1018 p.join()
1019
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020
1021class _TestEvent(BaseTestCase):
1022
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001023 @classmethod
1024 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025 time.sleep(TIMEOUT2)
1026 event.set()
1027
1028 def test_event(self):
1029 event = self.Event()
1030 wait = TimingWrapper(event.wait)
1031
Ezio Melotti13925002011-03-16 11:05:33 +02001032 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001034 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035
Benjamin Peterson965ce872009-04-05 21:24:58 +00001036 # Removed, threading.Event.wait() will return the value of the __flag
1037 # instead of None. API Shear with the semaphore backed mp.Event
1038 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001040 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1042
1043 event.set()
1044
1045 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001046 self.assertEqual(event.is_set(), True)
1047 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001049 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1051 # self.assertEqual(event.is_set(), True)
1052
1053 event.clear()
1054
1055 #self.assertEqual(event.is_set(), False)
1056
Jesus Cea94f964f2011-09-09 20:26:57 +02001057 p = self.Process(target=self._test_event, args=(event,))
1058 p.daemon = True
1059 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001060 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001061
1062#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001063# Tests for Barrier - adapted from tests in test/lock_tests.py
1064#
1065
1066# Many of the tests for threading.Barrier use a list as an atomic
1067# counter: a value is appended to increment the counter, and the
1068# length of the list gives the value. We use the class DummyList
1069# for the same purpose.
1070
1071class _DummyList(object):
1072
1073 def __init__(self):
1074 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1075 lock = multiprocessing.Lock()
1076 self.__setstate__((wrapper, lock))
1077 self._lengthbuf[0] = 0
1078
1079 def __setstate__(self, state):
1080 (self._wrapper, self._lock) = state
1081 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1082
1083 def __getstate__(self):
1084 return (self._wrapper, self._lock)
1085
1086 def append(self, _):
1087 with self._lock:
1088 self._lengthbuf[0] += 1
1089
1090 def __len__(self):
1091 with self._lock:
1092 return self._lengthbuf[0]
1093
1094def _wait():
1095 # A crude wait/yield function not relying on synchronization primitives.
1096 time.sleep(0.01)
1097
1098
1099class Bunch(object):
1100 """
1101 A bunch of threads.
1102 """
1103 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1104 """
1105 Construct a bunch of `n` threads running the same function `f`.
1106 If `wait_before_exit` is True, the threads won't terminate until
1107 do_finish() is called.
1108 """
1109 self.f = f
1110 self.args = args
1111 self.n = n
1112 self.started = namespace.DummyList()
1113 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001114 self._can_exit = namespace.Event()
1115 if not wait_before_exit:
1116 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001117 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001118 p = namespace.Process(target=self.task)
1119 p.daemon = True
1120 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001121
1122 def task(self):
1123 pid = os.getpid()
1124 self.started.append(pid)
1125 try:
1126 self.f(*self.args)
1127 finally:
1128 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001129 self._can_exit.wait(30)
1130 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001131
1132 def wait_for_started(self):
1133 while len(self.started) < self.n:
1134 _wait()
1135
1136 def wait_for_finished(self):
1137 while len(self.finished) < self.n:
1138 _wait()
1139
1140 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001141 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001142
1143
1144class AppendTrue(object):
1145 def __init__(self, obj):
1146 self.obj = obj
1147 def __call__(self):
1148 self.obj.append(True)
1149
1150
1151class _TestBarrier(BaseTestCase):
1152 """
1153 Tests for Barrier objects.
1154 """
1155 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001156 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001157
1158 def setUp(self):
1159 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1160
1161 def tearDown(self):
1162 self.barrier.abort()
1163 self.barrier = None
1164
1165 def DummyList(self):
1166 if self.TYPE == 'threads':
1167 return []
1168 elif self.TYPE == 'manager':
1169 return self.manager.list()
1170 else:
1171 return _DummyList()
1172
1173 def run_threads(self, f, args):
1174 b = Bunch(self, f, args, self.N-1)
1175 f(*args)
1176 b.wait_for_finished()
1177
1178 @classmethod
1179 def multipass(cls, barrier, results, n):
1180 m = barrier.parties
1181 assert m == cls.N
1182 for i in range(n):
1183 results[0].append(True)
1184 assert len(results[1]) == i * m
1185 barrier.wait()
1186 results[1].append(True)
1187 assert len(results[0]) == (i + 1) * m
1188 barrier.wait()
1189 try:
1190 assert barrier.n_waiting == 0
1191 except NotImplementedError:
1192 pass
1193 assert not barrier.broken
1194
1195 def test_barrier(self, passes=1):
1196 """
1197 Test that a barrier is passed in lockstep
1198 """
1199 results = [self.DummyList(), self.DummyList()]
1200 self.run_threads(self.multipass, (self.barrier, results, passes))
1201
1202 def test_barrier_10(self):
1203 """
1204 Test that a barrier works for 10 consecutive runs
1205 """
1206 return self.test_barrier(10)
1207
1208 @classmethod
1209 def _test_wait_return_f(cls, barrier, queue):
1210 res = barrier.wait()
1211 queue.put(res)
1212
1213 def test_wait_return(self):
1214 """
1215 test the return value from barrier.wait
1216 """
1217 queue = self.Queue()
1218 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1219 results = [queue.get() for i in range(self.N)]
1220 self.assertEqual(results.count(0), 1)
1221
1222 @classmethod
1223 def _test_action_f(cls, barrier, results):
1224 barrier.wait()
1225 if len(results) != 1:
1226 raise RuntimeError
1227
1228 def test_action(self):
1229 """
1230 Test the 'action' callback
1231 """
1232 results = self.DummyList()
1233 barrier = self.Barrier(self.N, action=AppendTrue(results))
1234 self.run_threads(self._test_action_f, (barrier, results))
1235 self.assertEqual(len(results), 1)
1236
1237 @classmethod
1238 def _test_abort_f(cls, barrier, results1, results2):
1239 try:
1240 i = barrier.wait()
1241 if i == cls.N//2:
1242 raise RuntimeError
1243 barrier.wait()
1244 results1.append(True)
1245 except threading.BrokenBarrierError:
1246 results2.append(True)
1247 except RuntimeError:
1248 barrier.abort()
1249
1250 def test_abort(self):
1251 """
1252 Test that an abort will put the barrier in a broken state
1253 """
1254 results1 = self.DummyList()
1255 results2 = self.DummyList()
1256 self.run_threads(self._test_abort_f,
1257 (self.barrier, results1, results2))
1258 self.assertEqual(len(results1), 0)
1259 self.assertEqual(len(results2), self.N-1)
1260 self.assertTrue(self.barrier.broken)
1261
1262 @classmethod
1263 def _test_reset_f(cls, barrier, results1, results2, results3):
1264 i = barrier.wait()
1265 if i == cls.N//2:
1266 # Wait until the other threads are all in the barrier.
1267 while barrier.n_waiting < cls.N-1:
1268 time.sleep(0.001)
1269 barrier.reset()
1270 else:
1271 try:
1272 barrier.wait()
1273 results1.append(True)
1274 except threading.BrokenBarrierError:
1275 results2.append(True)
1276 # Now, pass the barrier again
1277 barrier.wait()
1278 results3.append(True)
1279
1280 def test_reset(self):
1281 """
1282 Test that a 'reset' on a barrier frees the waiting threads
1283 """
1284 results1 = self.DummyList()
1285 results2 = self.DummyList()
1286 results3 = self.DummyList()
1287 self.run_threads(self._test_reset_f,
1288 (self.barrier, results1, results2, results3))
1289 self.assertEqual(len(results1), 0)
1290 self.assertEqual(len(results2), self.N-1)
1291 self.assertEqual(len(results3), self.N)
1292
1293 @classmethod
1294 def _test_abort_and_reset_f(cls, barrier, barrier2,
1295 results1, results2, results3):
1296 try:
1297 i = barrier.wait()
1298 if i == cls.N//2:
1299 raise RuntimeError
1300 barrier.wait()
1301 results1.append(True)
1302 except threading.BrokenBarrierError:
1303 results2.append(True)
1304 except RuntimeError:
1305 barrier.abort()
1306 # Synchronize and reset the barrier. Must synchronize first so
1307 # that everyone has left it when we reset, and after so that no
1308 # one enters it before the reset.
1309 if barrier2.wait() == cls.N//2:
1310 barrier.reset()
1311 barrier2.wait()
1312 barrier.wait()
1313 results3.append(True)
1314
1315 def test_abort_and_reset(self):
1316 """
1317 Test that a barrier can be reset after being broken.
1318 """
1319 results1 = self.DummyList()
1320 results2 = self.DummyList()
1321 results3 = self.DummyList()
1322 barrier2 = self.Barrier(self.N)
1323
1324 self.run_threads(self._test_abort_and_reset_f,
1325 (self.barrier, barrier2, results1, results2, results3))
1326 self.assertEqual(len(results1), 0)
1327 self.assertEqual(len(results2), self.N-1)
1328 self.assertEqual(len(results3), self.N)
1329
1330 @classmethod
1331 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001332 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001333 if i == cls.N//2:
1334 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001335 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001336 try:
1337 barrier.wait(0.5)
1338 except threading.BrokenBarrierError:
1339 results.append(True)
1340
1341 def test_timeout(self):
1342 """
1343 Test wait(timeout)
1344 """
1345 results = self.DummyList()
1346 self.run_threads(self._test_timeout_f, (self.barrier, results))
1347 self.assertEqual(len(results), self.barrier.parties)
1348
1349 @classmethod
1350 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001351 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001352 if i == cls.N//2:
1353 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001354 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001355 try:
1356 barrier.wait()
1357 except threading.BrokenBarrierError:
1358 results.append(True)
1359
1360 def test_default_timeout(self):
1361 """
1362 Test the barrier's default timeout
1363 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001364 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001365 results = self.DummyList()
1366 self.run_threads(self._test_default_timeout_f, (barrier, results))
1367 self.assertEqual(len(results), barrier.parties)
1368
1369 def test_single_thread(self):
1370 b = self.Barrier(1)
1371 b.wait()
1372 b.wait()
1373
1374 @classmethod
1375 def _test_thousand_f(cls, barrier, passes, conn, lock):
1376 for i in range(passes):
1377 barrier.wait()
1378 with lock:
1379 conn.send(i)
1380
1381 def test_thousand(self):
1382 if self.TYPE == 'manager':
1383 return
1384 passes = 1000
1385 lock = self.Lock()
1386 conn, child_conn = self.Pipe(False)
1387 for j in range(self.N):
1388 p = self.Process(target=self._test_thousand_f,
1389 args=(self.barrier, passes, child_conn, lock))
1390 p.start()
1391
1392 for i in range(passes):
1393 for j in range(self.N):
1394 self.assertEqual(conn.recv(), i)
1395
1396#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397#
1398#
1399
1400class _TestValue(BaseTestCase):
1401
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001402 ALLOWED_TYPES = ('processes',)
1403
Benjamin Petersone711caf2008-06-11 16:44:04 +00001404 codes_values = [
1405 ('i', 4343, 24234),
1406 ('d', 3.625, -4.25),
1407 ('h', -232, 234),
1408 ('c', latin('x'), latin('y'))
1409 ]
1410
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001411 def setUp(self):
1412 if not HAS_SHAREDCTYPES:
1413 self.skipTest("requires multiprocessing.sharedctypes")
1414
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001415 @classmethod
1416 def _test(cls, values):
1417 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001418 sv.value = cv[2]
1419
1420
1421 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001422 if raw:
1423 values = [self.RawValue(code, value)
1424 for code, value, _ in self.codes_values]
1425 else:
1426 values = [self.Value(code, value)
1427 for code, value, _ in self.codes_values]
1428
1429 for sv, cv in zip(values, self.codes_values):
1430 self.assertEqual(sv.value, cv[1])
1431
1432 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001433 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001434 proc.start()
1435 proc.join()
1436
1437 for sv, cv in zip(values, self.codes_values):
1438 self.assertEqual(sv.value, cv[2])
1439
1440 def test_rawvalue(self):
1441 self.test_value(raw=True)
1442
1443 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001444 val1 = self.Value('i', 5)
1445 lock1 = val1.get_lock()
1446 obj1 = val1.get_obj()
1447
1448 val2 = self.Value('i', 5, lock=None)
1449 lock2 = val2.get_lock()
1450 obj2 = val2.get_obj()
1451
1452 lock = self.Lock()
1453 val3 = self.Value('i', 5, lock=lock)
1454 lock3 = val3.get_lock()
1455 obj3 = val3.get_obj()
1456 self.assertEqual(lock, lock3)
1457
Jesse Nollerb0516a62009-01-18 03:11:38 +00001458 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001459 self.assertFalse(hasattr(arr4, 'get_lock'))
1460 self.assertFalse(hasattr(arr4, 'get_obj'))
1461
Jesse Nollerb0516a62009-01-18 03:11:38 +00001462 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1463
1464 arr5 = self.RawValue('i', 5)
1465 self.assertFalse(hasattr(arr5, 'get_lock'))
1466 self.assertFalse(hasattr(arr5, 'get_obj'))
1467
Benjamin Petersone711caf2008-06-11 16:44:04 +00001468
1469class _TestArray(BaseTestCase):
1470
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001471 ALLOWED_TYPES = ('processes',)
1472
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001473 @classmethod
1474 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475 for i in range(1, len(seq)):
1476 seq[i] += seq[i-1]
1477
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001478 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001479 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001480 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1481 if raw:
1482 arr = self.RawArray('i', seq)
1483 else:
1484 arr = self.Array('i', seq)
1485
1486 self.assertEqual(len(arr), len(seq))
1487 self.assertEqual(arr[3], seq[3])
1488 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1489
1490 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1491
1492 self.assertEqual(list(arr[:]), seq)
1493
1494 self.f(seq)
1495
1496 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001497 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001498 p.start()
1499 p.join()
1500
1501 self.assertEqual(list(arr[:]), seq)
1502
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001503 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001504 def test_array_from_size(self):
1505 size = 10
1506 # Test for zeroing (see issue #11675).
1507 # The repetition below strengthens the test by increasing the chances
1508 # of previously allocated non-zero memory being used for the new array
1509 # on the 2nd and 3rd loops.
1510 for _ in range(3):
1511 arr = self.Array('i', size)
1512 self.assertEqual(len(arr), size)
1513 self.assertEqual(list(arr), [0] * size)
1514 arr[:] = range(10)
1515 self.assertEqual(list(arr), list(range(10)))
1516 del arr
1517
1518 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 def test_rawarray(self):
1520 self.test_array(raw=True)
1521
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001522 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001523 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001524 arr1 = self.Array('i', list(range(10)))
1525 lock1 = arr1.get_lock()
1526 obj1 = arr1.get_obj()
1527
1528 arr2 = self.Array('i', list(range(10)), lock=None)
1529 lock2 = arr2.get_lock()
1530 obj2 = arr2.get_obj()
1531
1532 lock = self.Lock()
1533 arr3 = self.Array('i', list(range(10)), lock=lock)
1534 lock3 = arr3.get_lock()
1535 obj3 = arr3.get_obj()
1536 self.assertEqual(lock, lock3)
1537
Jesse Nollerb0516a62009-01-18 03:11:38 +00001538 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001539 self.assertFalse(hasattr(arr4, 'get_lock'))
1540 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001541 self.assertRaises(AttributeError,
1542 self.Array, 'i', range(10), lock='notalock')
1543
1544 arr5 = self.RawArray('i', range(10))
1545 self.assertFalse(hasattr(arr5, 'get_lock'))
1546 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001547
1548#
1549#
1550#
1551
1552class _TestContainers(BaseTestCase):
1553
1554 ALLOWED_TYPES = ('manager',)
1555
1556 def test_list(self):
1557 a = self.list(list(range(10)))
1558 self.assertEqual(a[:], list(range(10)))
1559
1560 b = self.list()
1561 self.assertEqual(b[:], [])
1562
1563 b.extend(list(range(5)))
1564 self.assertEqual(b[:], list(range(5)))
1565
1566 self.assertEqual(b[2], 2)
1567 self.assertEqual(b[2:10], [2,3,4])
1568
1569 b *= 2
1570 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1571
1572 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1573
1574 self.assertEqual(a[:], list(range(10)))
1575
1576 d = [a, b]
1577 e = self.list(d)
1578 self.assertEqual(
1579 e[:],
1580 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1581 )
1582
1583 f = self.list([a])
1584 a.append('hello')
1585 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1586
1587 def test_dict(self):
1588 d = self.dict()
1589 indices = list(range(65, 70))
1590 for i in indices:
1591 d[i] = chr(i)
1592 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1593 self.assertEqual(sorted(d.keys()), indices)
1594 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1595 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1596
1597 def test_namespace(self):
1598 n = self.Namespace()
1599 n.name = 'Bob'
1600 n.job = 'Builder'
1601 n._hidden = 'hidden'
1602 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1603 del n.job
1604 self.assertEqual(str(n), "Namespace(name='Bob')")
1605 self.assertTrue(hasattr(n, 'name'))
1606 self.assertTrue(not hasattr(n, 'job'))
1607
1608#
1609#
1610#
1611
1612def sqr(x, wait=0.0):
1613 time.sleep(wait)
1614 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001615
Antoine Pitroude911b22011-12-21 11:03:24 +01001616def mul(x, y):
1617 return x*y
1618
Benjamin Petersone711caf2008-06-11 16:44:04 +00001619class _TestPool(BaseTestCase):
1620
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001621 @classmethod
1622 def setUpClass(cls):
1623 super().setUpClass()
1624 cls.pool = cls.Pool(4)
1625
1626 @classmethod
1627 def tearDownClass(cls):
1628 cls.pool.terminate()
1629 cls.pool.join()
1630 cls.pool = None
1631 super().tearDownClass()
1632
Benjamin Petersone711caf2008-06-11 16:44:04 +00001633 def test_apply(self):
1634 papply = self.pool.apply
1635 self.assertEqual(papply(sqr, (5,)), sqr(5))
1636 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1637
1638 def test_map(self):
1639 pmap = self.pool.map
1640 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1641 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1642 list(map(sqr, list(range(100)))))
1643
Antoine Pitroude911b22011-12-21 11:03:24 +01001644 def test_starmap(self):
1645 psmap = self.pool.starmap
1646 tuples = list(zip(range(10), range(9,-1, -1)))
1647 self.assertEqual(psmap(mul, tuples),
1648 list(itertools.starmap(mul, tuples)))
1649 tuples = list(zip(range(100), range(99,-1, -1)))
1650 self.assertEqual(psmap(mul, tuples, chunksize=20),
1651 list(itertools.starmap(mul, tuples)))
1652
1653 def test_starmap_async(self):
1654 tuples = list(zip(range(100), range(99,-1, -1)))
1655 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1656 list(itertools.starmap(mul, tuples)))
1657
Hynek Schlawack254af262012-10-27 12:53:02 +02001658 def test_map_async(self):
1659 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1660 list(map(sqr, list(range(10)))))
1661
1662 def test_map_async_callbacks(self):
1663 call_args = self.manager.list() if self.TYPE == 'manager' else []
1664 self.pool.map_async(int, ['1'],
1665 callback=call_args.append,
1666 error_callback=call_args.append).wait()
1667 self.assertEqual(1, len(call_args))
1668 self.assertEqual([1], call_args[0])
1669 self.pool.map_async(int, ['a'],
1670 callback=call_args.append,
1671 error_callback=call_args.append).wait()
1672 self.assertEqual(2, len(call_args))
1673 self.assertIsInstance(call_args[1], ValueError)
1674
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001675 def test_map_chunksize(self):
1676 try:
1677 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1678 except multiprocessing.TimeoutError:
1679 self.fail("pool.map_async with chunksize stalled on null list")
1680
Benjamin Petersone711caf2008-06-11 16:44:04 +00001681 def test_async(self):
1682 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1683 get = TimingWrapper(res.get)
1684 self.assertEqual(get(), 49)
1685 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1686
1687 def test_async_timeout(self):
1688 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1689 get = TimingWrapper(res.get)
1690 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1691 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1692
1693 def test_imap(self):
1694 it = self.pool.imap(sqr, list(range(10)))
1695 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1696
1697 it = self.pool.imap(sqr, list(range(10)))
1698 for i in range(10):
1699 self.assertEqual(next(it), i*i)
1700 self.assertRaises(StopIteration, it.__next__)
1701
1702 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1703 for i in range(1000):
1704 self.assertEqual(next(it), i*i)
1705 self.assertRaises(StopIteration, it.__next__)
1706
1707 def test_imap_unordered(self):
1708 it = self.pool.imap_unordered(sqr, list(range(1000)))
1709 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1710
1711 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1712 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1713
1714 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001715 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1716 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1717
Benjamin Petersone711caf2008-06-11 16:44:04 +00001718 p = multiprocessing.Pool(3)
1719 self.assertEqual(3, len(p._pool))
1720 p.close()
1721 p.join()
1722
1723 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001724 result = self.pool.map_async(
1725 time.sleep, [0.1 for i in range(10000)], chunksize=1
1726 )
1727 self.pool.terminate()
1728 join = TimingWrapper(self.pool.join)
1729 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001730 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001731
Richard Oudkerke41682b2012-06-06 19:04:57 +01001732 def test_empty_iterable(self):
1733 # See Issue 12157
1734 p = self.Pool(1)
1735
1736 self.assertEqual(p.map(sqr, []), [])
1737 self.assertEqual(list(p.imap(sqr, [])), [])
1738 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1739 self.assertEqual(p.map_async(sqr, []).get(), [])
1740
1741 p.close()
1742 p.join()
1743
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001744 def test_context(self):
1745 if self.TYPE == 'processes':
1746 L = list(range(10))
1747 expected = [sqr(i) for i in L]
1748 with multiprocessing.Pool(2) as p:
1749 r = p.map_async(sqr, L)
1750 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001751 print(p._state)
1752 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001753
Ask Solem2afcbf22010-11-09 20:55:52 +00001754def raising():
1755 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001756
Ask Solem2afcbf22010-11-09 20:55:52 +00001757def unpickleable_result():
1758 return lambda: 42
1759
1760class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001761 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001762
1763 def test_async_error_callback(self):
1764 p = multiprocessing.Pool(2)
1765
1766 scratchpad = [None]
1767 def errback(exc):
1768 scratchpad[0] = exc
1769
1770 res = p.apply_async(raising, error_callback=errback)
1771 self.assertRaises(KeyError, res.get)
1772 self.assertTrue(scratchpad[0])
1773 self.assertIsInstance(scratchpad[0], KeyError)
1774
1775 p.close()
1776 p.join()
1777
1778 def test_unpickleable_result(self):
1779 from multiprocessing.pool import MaybeEncodingError
1780 p = multiprocessing.Pool(2)
1781
1782 # Make sure we don't lose pool processes because of encoding errors.
1783 for iteration in range(20):
1784
1785 scratchpad = [None]
1786 def errback(exc):
1787 scratchpad[0] = exc
1788
1789 res = p.apply_async(unpickleable_result, error_callback=errback)
1790 self.assertRaises(MaybeEncodingError, res.get)
1791 wrapped = scratchpad[0]
1792 self.assertTrue(wrapped)
1793 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1794 self.assertIsNotNone(wrapped.exc)
1795 self.assertIsNotNone(wrapped.value)
1796
1797 p.close()
1798 p.join()
1799
1800class _TestPoolWorkerLifetime(BaseTestCase):
1801 ALLOWED_TYPES = ('processes', )
1802
Jesse Noller1f0b6582010-01-27 03:36:01 +00001803 def test_pool_worker_lifetime(self):
1804 p = multiprocessing.Pool(3, maxtasksperchild=10)
1805 self.assertEqual(3, len(p._pool))
1806 origworkerpids = [w.pid for w in p._pool]
1807 # Run many tasks so each worker gets replaced (hopefully)
1808 results = []
1809 for i in range(100):
1810 results.append(p.apply_async(sqr, (i, )))
1811 # Fetch the results and verify we got the right answers,
1812 # also ensuring all the tasks have completed.
1813 for (j, res) in enumerate(results):
1814 self.assertEqual(res.get(), sqr(j))
1815 # Refill the pool
1816 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001817 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001818 # (countdown * DELTA = 5 seconds max startup process time)
1819 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001820 while countdown and not all(w.is_alive() for w in p._pool):
1821 countdown -= 1
1822 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001823 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001824 # All pids should be assigned. See issue #7805.
1825 self.assertNotIn(None, origworkerpids)
1826 self.assertNotIn(None, finalworkerpids)
1827 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001828 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1829 p.close()
1830 p.join()
1831
Charles-François Natalif8859e12011-10-24 18:45:29 +02001832 def test_pool_worker_lifetime_early_close(self):
1833 # Issue #10332: closing a pool whose workers have limited lifetimes
1834 # before all the tasks completed would make join() hang.
1835 p = multiprocessing.Pool(3, maxtasksperchild=1)
1836 results = []
1837 for i in range(6):
1838 results.append(p.apply_async(sqr, (i, 0.3)))
1839 p.close()
1840 p.join()
1841 # check the results
1842 for (j, res) in enumerate(results):
1843 self.assertEqual(res.get(), sqr(j))
1844
Benjamin Petersone711caf2008-06-11 16:44:04 +00001845#
1846# Test of creating a customized manager class
1847#
1848
1849from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1850
1851class FooBar(object):
1852 def f(self):
1853 return 'f()'
1854 def g(self):
1855 raise ValueError
1856 def _h(self):
1857 return '_h()'
1858
1859def baz():
1860 for i in range(10):
1861 yield i*i
1862
1863class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001864 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865 def __iter__(self):
1866 return self
1867 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001868 return self._callmethod('__next__')
1869
1870class MyManager(BaseManager):
1871 pass
1872
1873MyManager.register('Foo', callable=FooBar)
1874MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1875MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1876
1877
1878class _TestMyManager(BaseTestCase):
1879
1880 ALLOWED_TYPES = ('manager',)
1881
1882 def test_mymanager(self):
1883 manager = MyManager()
1884 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001885 self.common(manager)
1886 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001887
Richard Oudkerkac385712012-06-18 21:29:30 +01001888 # If the manager process exited cleanly then the exitcode
1889 # will be zero. Otherwise (after a short timeout)
1890 # terminate() is used, resulting in an exitcode of -SIGTERM.
1891 self.assertEqual(manager._process.exitcode, 0)
1892
1893 def test_mymanager_context(self):
1894 with MyManager() as manager:
1895 self.common(manager)
1896 self.assertEqual(manager._process.exitcode, 0)
1897
1898 def test_mymanager_context_prestarted(self):
1899 manager = MyManager()
1900 manager.start()
1901 with manager:
1902 self.common(manager)
1903 self.assertEqual(manager._process.exitcode, 0)
1904
1905 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001906 foo = manager.Foo()
1907 bar = manager.Bar()
1908 baz = manager.baz()
1909
1910 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1911 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1912
1913 self.assertEqual(foo_methods, ['f', 'g'])
1914 self.assertEqual(bar_methods, ['f', '_h'])
1915
1916 self.assertEqual(foo.f(), 'f()')
1917 self.assertRaises(ValueError, foo.g)
1918 self.assertEqual(foo._callmethod('f'), 'f()')
1919 self.assertRaises(RemoteError, foo._callmethod, '_h')
1920
1921 self.assertEqual(bar.f(), 'f()')
1922 self.assertEqual(bar._h(), '_h()')
1923 self.assertEqual(bar._callmethod('f'), 'f()')
1924 self.assertEqual(bar._callmethod('_h'), '_h()')
1925
1926 self.assertEqual(list(baz), [i*i for i in range(10)])
1927
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001928
Benjamin Petersone711caf2008-06-11 16:44:04 +00001929#
1930# Test of connecting to a remote server and using xmlrpclib for serialization
1931#
1932
1933_queue = pyqueue.Queue()
1934def get_queue():
1935 return _queue
1936
1937class QueueManager(BaseManager):
1938 '''manager class used by server process'''
1939QueueManager.register('get_queue', callable=get_queue)
1940
1941class QueueManager2(BaseManager):
1942 '''manager class which specifies the same interface as QueueManager'''
1943QueueManager2.register('get_queue')
1944
1945
1946SERIALIZER = 'xmlrpclib'
1947
1948class _TestRemoteManager(BaseTestCase):
1949
1950 ALLOWED_TYPES = ('manager',)
1951
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001952 @classmethod
1953 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001954 manager = QueueManager2(
1955 address=address, authkey=authkey, serializer=SERIALIZER
1956 )
1957 manager.connect()
1958 queue = manager.get_queue()
1959 queue.put(('hello world', None, True, 2.25))
1960
1961 def test_remote(self):
1962 authkey = os.urandom(32)
1963
1964 manager = QueueManager(
1965 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1966 )
1967 manager.start()
1968
1969 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001970 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001971 p.start()
1972
1973 manager2 = QueueManager2(
1974 address=manager.address, authkey=authkey, serializer=SERIALIZER
1975 )
1976 manager2.connect()
1977 queue = manager2.get_queue()
1978
1979 # Note that xmlrpclib will deserialize object as a list not a tuple
1980 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1981
1982 # Because we are using xmlrpclib for serialization instead of
1983 # pickle this will cause a serialization error.
1984 self.assertRaises(Exception, queue.put, time.sleep)
1985
1986 # Make queue finalizer run before the server is stopped
1987 del queue
1988 manager.shutdown()
1989
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001990class _TestManagerRestart(BaseTestCase):
1991
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001992 @classmethod
1993 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001994 manager = QueueManager(
1995 address=address, authkey=authkey, serializer=SERIALIZER)
1996 manager.connect()
1997 queue = manager.get_queue()
1998 queue.put('hello world')
1999
2000 def test_rapid_restart(self):
2001 authkey = os.urandom(32)
2002 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002003 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002004 srvr = manager.get_server()
2005 addr = srvr.address
2006 # Close the connection.Listener socket which gets opened as a part
2007 # of manager.get_server(). It's not needed for the test.
2008 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002009 manager.start()
2010
2011 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002012 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002013 p.start()
2014 queue = manager.get_queue()
2015 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002016 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002017 manager.shutdown()
2018 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002019 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002020 try:
2021 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002022 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002023 if e.errno != errno.EADDRINUSE:
2024 raise
2025 # Retry after some time, in case the old socket was lingering
2026 # (sporadic failure on buildbots)
2027 time.sleep(1.0)
2028 manager = QueueManager(
2029 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002030 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002031
Benjamin Petersone711caf2008-06-11 16:44:04 +00002032#
2033#
2034#
2035
2036SENTINEL = latin('')
2037
2038class _TestConnection(BaseTestCase):
2039
2040 ALLOWED_TYPES = ('processes', 'threads')
2041
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002042 @classmethod
2043 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002044 for msg in iter(conn.recv_bytes, SENTINEL):
2045 conn.send_bytes(msg)
2046 conn.close()
2047
2048 def test_connection(self):
2049 conn, child_conn = self.Pipe()
2050
2051 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002052 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002053 p.start()
2054
2055 seq = [1, 2.25, None]
2056 msg = latin('hello world')
2057 longmsg = msg * 10
2058 arr = array.array('i', list(range(4)))
2059
2060 if self.TYPE == 'processes':
2061 self.assertEqual(type(conn.fileno()), int)
2062
2063 self.assertEqual(conn.send(seq), None)
2064 self.assertEqual(conn.recv(), seq)
2065
2066 self.assertEqual(conn.send_bytes(msg), None)
2067 self.assertEqual(conn.recv_bytes(), msg)
2068
2069 if self.TYPE == 'processes':
2070 buffer = array.array('i', [0]*10)
2071 expected = list(arr) + [0] * (10 - len(arr))
2072 self.assertEqual(conn.send_bytes(arr), None)
2073 self.assertEqual(conn.recv_bytes_into(buffer),
2074 len(arr) * buffer.itemsize)
2075 self.assertEqual(list(buffer), expected)
2076
2077 buffer = array.array('i', [0]*10)
2078 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2079 self.assertEqual(conn.send_bytes(arr), None)
2080 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2081 len(arr) * buffer.itemsize)
2082 self.assertEqual(list(buffer), expected)
2083
2084 buffer = bytearray(latin(' ' * 40))
2085 self.assertEqual(conn.send_bytes(longmsg), None)
2086 try:
2087 res = conn.recv_bytes_into(buffer)
2088 except multiprocessing.BufferTooShort as e:
2089 self.assertEqual(e.args, (longmsg,))
2090 else:
2091 self.fail('expected BufferTooShort, got %s' % res)
2092
2093 poll = TimingWrapper(conn.poll)
2094
2095 self.assertEqual(poll(), False)
2096 self.assertTimingAlmostEqual(poll.elapsed, 0)
2097
Richard Oudkerk59d54042012-05-10 16:11:12 +01002098 self.assertEqual(poll(-1), False)
2099 self.assertTimingAlmostEqual(poll.elapsed, 0)
2100
Benjamin Petersone711caf2008-06-11 16:44:04 +00002101 self.assertEqual(poll(TIMEOUT1), False)
2102 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2103
2104 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002105 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002106
2107 self.assertEqual(poll(TIMEOUT1), True)
2108 self.assertTimingAlmostEqual(poll.elapsed, 0)
2109
2110 self.assertEqual(conn.recv(), None)
2111
2112 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2113 conn.send_bytes(really_big_msg)
2114 self.assertEqual(conn.recv_bytes(), really_big_msg)
2115
2116 conn.send_bytes(SENTINEL) # tell child to quit
2117 child_conn.close()
2118
2119 if self.TYPE == 'processes':
2120 self.assertEqual(conn.readable, True)
2121 self.assertEqual(conn.writable, True)
2122 self.assertRaises(EOFError, conn.recv)
2123 self.assertRaises(EOFError, conn.recv_bytes)
2124
2125 p.join()
2126
2127 def test_duplex_false(self):
2128 reader, writer = self.Pipe(duplex=False)
2129 self.assertEqual(writer.send(1), None)
2130 self.assertEqual(reader.recv(), 1)
2131 if self.TYPE == 'processes':
2132 self.assertEqual(reader.readable, True)
2133 self.assertEqual(reader.writable, False)
2134 self.assertEqual(writer.readable, False)
2135 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002136 self.assertRaises(OSError, reader.send, 2)
2137 self.assertRaises(OSError, writer.recv)
2138 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002139
2140 def test_spawn_close(self):
2141 # We test that a pipe connection can be closed by parent
2142 # process immediately after child is spawned. On Windows this
2143 # would have sometimes failed on old versions because
2144 # child_conn would be closed before the child got a chance to
2145 # duplicate it.
2146 conn, child_conn = self.Pipe()
2147
2148 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002149 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002150 p.start()
2151 child_conn.close() # this might complete before child initializes
2152
2153 msg = latin('hello')
2154 conn.send_bytes(msg)
2155 self.assertEqual(conn.recv_bytes(), msg)
2156
2157 conn.send_bytes(SENTINEL)
2158 conn.close()
2159 p.join()
2160
2161 def test_sendbytes(self):
2162 if self.TYPE != 'processes':
2163 return
2164
2165 msg = latin('abcdefghijklmnopqrstuvwxyz')
2166 a, b = self.Pipe()
2167
2168 a.send_bytes(msg)
2169 self.assertEqual(b.recv_bytes(), msg)
2170
2171 a.send_bytes(msg, 5)
2172 self.assertEqual(b.recv_bytes(), msg[5:])
2173
2174 a.send_bytes(msg, 7, 8)
2175 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2176
2177 a.send_bytes(msg, 26)
2178 self.assertEqual(b.recv_bytes(), latin(''))
2179
2180 a.send_bytes(msg, 26, 0)
2181 self.assertEqual(b.recv_bytes(), latin(''))
2182
2183 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2184
2185 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2186
2187 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2188
2189 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2190
2191 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2192
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002193 @classmethod
2194 def _is_fd_assigned(cls, fd):
2195 try:
2196 os.fstat(fd)
2197 except OSError as e:
2198 if e.errno == errno.EBADF:
2199 return False
2200 raise
2201 else:
2202 return True
2203
2204 @classmethod
2205 def _writefd(cls, conn, data, create_dummy_fds=False):
2206 if create_dummy_fds:
2207 for i in range(0, 256):
2208 if not cls._is_fd_assigned(i):
2209 os.dup2(conn.fileno(), i)
2210 fd = reduction.recv_handle(conn)
2211 if msvcrt:
2212 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2213 os.write(fd, data)
2214 os.close(fd)
2215
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002216 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002217 def test_fd_transfer(self):
2218 if self.TYPE != 'processes':
2219 self.skipTest("only makes sense with processes")
2220 conn, child_conn = self.Pipe(duplex=True)
2221
2222 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002223 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002224 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002225 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002226 with open(test.support.TESTFN, "wb") as f:
2227 fd = f.fileno()
2228 if msvcrt:
2229 fd = msvcrt.get_osfhandle(fd)
2230 reduction.send_handle(conn, fd, p.pid)
2231 p.join()
2232 with open(test.support.TESTFN, "rb") as f:
2233 self.assertEqual(f.read(), b"foo")
2234
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002235 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002236 @unittest.skipIf(sys.platform == "win32",
2237 "test semantics don't make sense on Windows")
2238 @unittest.skipIf(MAXFD <= 256,
2239 "largest assignable fd number is too small")
2240 @unittest.skipUnless(hasattr(os, "dup2"),
2241 "test needs os.dup2()")
2242 def test_large_fd_transfer(self):
2243 # With fd > 256 (issue #11657)
2244 if self.TYPE != 'processes':
2245 self.skipTest("only makes sense with processes")
2246 conn, child_conn = self.Pipe(duplex=True)
2247
2248 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002249 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002250 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002251 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002252 with open(test.support.TESTFN, "wb") as f:
2253 fd = f.fileno()
2254 for newfd in range(256, MAXFD):
2255 if not self._is_fd_assigned(newfd):
2256 break
2257 else:
2258 self.fail("could not find an unassigned large file descriptor")
2259 os.dup2(fd, newfd)
2260 try:
2261 reduction.send_handle(conn, newfd, p.pid)
2262 finally:
2263 os.close(newfd)
2264 p.join()
2265 with open(test.support.TESTFN, "rb") as f:
2266 self.assertEqual(f.read(), b"bar")
2267
Jesus Cea4507e642011-09-21 03:53:25 +02002268 @classmethod
2269 def _send_data_without_fd(self, conn):
2270 os.write(conn.fileno(), b"\0")
2271
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002272 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002273 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2274 def test_missing_fd_transfer(self):
2275 # Check that exception is raised when received data is not
2276 # accompanied by a file descriptor in ancillary data.
2277 if self.TYPE != 'processes':
2278 self.skipTest("only makes sense with processes")
2279 conn, child_conn = self.Pipe(duplex=True)
2280
2281 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2282 p.daemon = True
2283 p.start()
2284 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2285 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002286
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002287 def test_context(self):
2288 a, b = self.Pipe()
2289
2290 with a, b:
2291 a.send(1729)
2292 self.assertEqual(b.recv(), 1729)
2293 if self.TYPE == 'processes':
2294 self.assertFalse(a.closed)
2295 self.assertFalse(b.closed)
2296
2297 if self.TYPE == 'processes':
2298 self.assertTrue(a.closed)
2299 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002300 self.assertRaises(OSError, a.recv)
2301 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002302
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002303class _TestListener(BaseTestCase):
2304
Richard Oudkerk91257752012-06-15 21:53:34 +01002305 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002306
2307 def test_multiple_bind(self):
2308 for family in self.connection.families:
2309 l = self.connection.Listener(family=family)
2310 self.addCleanup(l.close)
2311 self.assertRaises(OSError, self.connection.Listener,
2312 l.address, family)
2313
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002314 def test_context(self):
2315 with self.connection.Listener() as l:
2316 with self.connection.Client(l.address) as c:
2317 with l.accept() as d:
2318 c.send(1729)
2319 self.assertEqual(d.recv(), 1729)
2320
2321 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002322 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002323
Benjamin Petersone711caf2008-06-11 16:44:04 +00002324class _TestListenerClient(BaseTestCase):
2325
2326 ALLOWED_TYPES = ('processes', 'threads')
2327
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002328 @classmethod
2329 def _test(cls, address):
2330 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002331 conn.send('hello')
2332 conn.close()
2333
2334 def test_listener_client(self):
2335 for family in self.connection.families:
2336 l = self.connection.Listener(family=family)
2337 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002338 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002339 p.start()
2340 conn = l.accept()
2341 self.assertEqual(conn.recv(), 'hello')
2342 p.join()
2343 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002344
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002345 def test_issue14725(self):
2346 l = self.connection.Listener()
2347 p = self.Process(target=self._test, args=(l.address,))
2348 p.daemon = True
2349 p.start()
2350 time.sleep(1)
2351 # On Windows the client process should by now have connected,
2352 # written data and closed the pipe handle by now. This causes
2353 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2354 # 14725.
2355 conn = l.accept()
2356 self.assertEqual(conn.recv(), 'hello')
2357 conn.close()
2358 p.join()
2359 l.close()
2360
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002361 def test_issue16955(self):
2362 for fam in self.connection.families:
2363 l = self.connection.Listener(family=fam)
2364 c = self.connection.Client(l.address)
2365 a = l.accept()
2366 a.send_bytes(b"hello")
2367 self.assertTrue(c.poll(1))
2368 a.close()
2369 c.close()
2370 l.close()
2371
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002372class _TestPoll(unittest.TestCase):
2373
2374 ALLOWED_TYPES = ('processes', 'threads')
2375
2376 def test_empty_string(self):
2377 a, b = self.Pipe()
2378 self.assertEqual(a.poll(), False)
2379 b.send_bytes(b'')
2380 self.assertEqual(a.poll(), True)
2381 self.assertEqual(a.poll(), True)
2382
2383 @classmethod
2384 def _child_strings(cls, conn, strings):
2385 for s in strings:
2386 time.sleep(0.1)
2387 conn.send_bytes(s)
2388 conn.close()
2389
2390 def test_strings(self):
2391 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2392 a, b = self.Pipe()
2393 p = self.Process(target=self._child_strings, args=(b, strings))
2394 p.start()
2395
2396 for s in strings:
2397 for i in range(200):
2398 if a.poll(0.01):
2399 break
2400 x = a.recv_bytes()
2401 self.assertEqual(s, x)
2402
2403 p.join()
2404
2405 @classmethod
2406 def _child_boundaries(cls, r):
2407 # Polling may "pull" a message in to the child process, but we
2408 # don't want it to pull only part of a message, as that would
2409 # corrupt the pipe for any other processes which might later
2410 # read from it.
2411 r.poll(5)
2412
2413 def test_boundaries(self):
2414 r, w = self.Pipe(False)
2415 p = self.Process(target=self._child_boundaries, args=(r,))
2416 p.start()
2417 time.sleep(2)
2418 L = [b"first", b"second"]
2419 for obj in L:
2420 w.send_bytes(obj)
2421 w.close()
2422 p.join()
2423 self.assertIn(r.recv_bytes(), L)
2424
2425 @classmethod
2426 def _child_dont_merge(cls, b):
2427 b.send_bytes(b'a')
2428 b.send_bytes(b'b')
2429 b.send_bytes(b'cd')
2430
2431 def test_dont_merge(self):
2432 a, b = self.Pipe()
2433 self.assertEqual(a.poll(0.0), False)
2434 self.assertEqual(a.poll(0.1), False)
2435
2436 p = self.Process(target=self._child_dont_merge, args=(b,))
2437 p.start()
2438
2439 self.assertEqual(a.recv_bytes(), b'a')
2440 self.assertEqual(a.poll(1.0), True)
2441 self.assertEqual(a.poll(1.0), True)
2442 self.assertEqual(a.recv_bytes(), b'b')
2443 self.assertEqual(a.poll(1.0), True)
2444 self.assertEqual(a.poll(1.0), True)
2445 self.assertEqual(a.poll(0.0), True)
2446 self.assertEqual(a.recv_bytes(), b'cd')
2447
2448 p.join()
2449
Benjamin Petersone711caf2008-06-11 16:44:04 +00002450#
2451# Test of sending connection and socket objects between processes
2452#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002453
2454@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002455class _TestPicklingConnections(BaseTestCase):
2456
2457 ALLOWED_TYPES = ('processes',)
2458
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002459 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002460 def tearDownClass(cls):
2461 from multiprocessing.reduction import resource_sharer
2462 resource_sharer.stop(timeout=5)
2463
2464 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002465 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002466 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002467 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002468 conn.send(l.address)
2469 new_conn = l.accept()
2470 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002471 new_conn.close()
2472 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002473
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002474 l = socket.socket()
2475 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002476 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002477 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002478 new_conn, addr = l.accept()
2479 conn.send(new_conn)
2480 new_conn.close()
2481 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002482
2483 conn.recv()
2484
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002485 @classmethod
2486 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002487 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002488 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002489 client.send(msg.upper())
2490 client.close()
2491
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002492 address, msg = conn.recv()
2493 client = socket.socket()
2494 client.connect(address)
2495 client.sendall(msg.upper())
2496 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002497
2498 conn.close()
2499
2500 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002501 families = self.connection.families
2502
2503 lconn, lconn0 = self.Pipe()
2504 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002505 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002506 lp.start()
2507 lconn0.close()
2508
2509 rconn, rconn0 = self.Pipe()
2510 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002511 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002512 rp.start()
2513 rconn0.close()
2514
2515 for fam in families:
2516 msg = ('This connection uses family %s' % fam).encode('ascii')
2517 address = lconn.recv()
2518 rconn.send((address, msg))
2519 new_conn = lconn.recv()
2520 self.assertEqual(new_conn.recv(), msg.upper())
2521
2522 rconn.send(None)
2523
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002524 msg = latin('This connection uses a normal socket')
2525 address = lconn.recv()
2526 rconn.send((address, msg))
2527 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002528 buf = []
2529 while True:
2530 s = new_conn.recv(100)
2531 if not s:
2532 break
2533 buf.append(s)
2534 buf = b''.join(buf)
2535 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002536 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002537
2538 lconn.send(None)
2539
2540 rconn.close()
2541 lconn.close()
2542
2543 lp.join()
2544 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002545
2546 @classmethod
2547 def child_access(cls, conn):
2548 w = conn.recv()
2549 w.send('all is well')
2550 w.close()
2551
2552 r = conn.recv()
2553 msg = r.recv()
2554 conn.send(msg*2)
2555
2556 conn.close()
2557
2558 def test_access(self):
2559 # On Windows, if we do not specify a destination pid when
2560 # using DupHandle then we need to be careful to use the
2561 # correct access flags for DuplicateHandle(), or else
2562 # DupHandle.detach() will raise PermissionError. For example,
2563 # for a read only pipe handle we should use
2564 # access=FILE_GENERIC_READ. (Unfortunately
2565 # DUPLICATE_SAME_ACCESS does not work.)
2566 conn, child_conn = self.Pipe()
2567 p = self.Process(target=self.child_access, args=(child_conn,))
2568 p.daemon = True
2569 p.start()
2570 child_conn.close()
2571
2572 r, w = self.Pipe(duplex=False)
2573 conn.send(w)
2574 w.close()
2575 self.assertEqual(r.recv(), 'all is well')
2576 r.close()
2577
2578 r, w = self.Pipe(duplex=False)
2579 conn.send(r)
2580 r.close()
2581 w.send('foobar')
2582 w.close()
2583 self.assertEqual(conn.recv(), 'foobar'*2)
2584
Benjamin Petersone711caf2008-06-11 16:44:04 +00002585#
2586#
2587#
2588
2589class _TestHeap(BaseTestCase):
2590
2591 ALLOWED_TYPES = ('processes',)
2592
2593 def test_heap(self):
2594 iterations = 5000
2595 maxblocks = 50
2596 blocks = []
2597
2598 # create and destroy lots of blocks of different sizes
2599 for i in range(iterations):
2600 size = int(random.lognormvariate(0, 1) * 1000)
2601 b = multiprocessing.heap.BufferWrapper(size)
2602 blocks.append(b)
2603 if len(blocks) > maxblocks:
2604 i = random.randrange(maxblocks)
2605 del blocks[i]
2606
2607 # get the heap object
2608 heap = multiprocessing.heap.BufferWrapper._heap
2609
2610 # verify the state of the heap
2611 all = []
2612 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002613 heap._lock.acquire()
2614 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002615 for L in list(heap._len_to_seq.values()):
2616 for arena, start, stop in L:
2617 all.append((heap._arenas.index(arena), start, stop,
2618 stop-start, 'free'))
2619 for arena, start, stop in heap._allocated_blocks:
2620 all.append((heap._arenas.index(arena), start, stop,
2621 stop-start, 'occupied'))
2622 occupied += (stop-start)
2623
2624 all.sort()
2625
2626 for i in range(len(all)-1):
2627 (arena, start, stop) = all[i][:3]
2628 (narena, nstart, nstop) = all[i+1][:3]
2629 self.assertTrue((arena != narena and nstart == 0) or
2630 (stop == nstart))
2631
Charles-François Natali778db492011-07-02 14:35:49 +02002632 def test_free_from_gc(self):
2633 # Check that freeing of blocks by the garbage collector doesn't deadlock
2634 # (issue #12352).
2635 # Make sure the GC is enabled, and set lower collection thresholds to
2636 # make collections more frequent (and increase the probability of
2637 # deadlock).
2638 if not gc.isenabled():
2639 gc.enable()
2640 self.addCleanup(gc.disable)
2641 thresholds = gc.get_threshold()
2642 self.addCleanup(gc.set_threshold, *thresholds)
2643 gc.set_threshold(10)
2644
2645 # perform numerous block allocations, with cyclic references to make
2646 # sure objects are collected asynchronously by the gc
2647 for i in range(5000):
2648 a = multiprocessing.heap.BufferWrapper(1)
2649 b = multiprocessing.heap.BufferWrapper(1)
2650 # circular references
2651 a.buddy = b
2652 b.buddy = a
2653
Benjamin Petersone711caf2008-06-11 16:44:04 +00002654#
2655#
2656#
2657
Benjamin Petersone711caf2008-06-11 16:44:04 +00002658class _Foo(Structure):
2659 _fields_ = [
2660 ('x', c_int),
2661 ('y', c_double)
2662 ]
2663
2664class _TestSharedCTypes(BaseTestCase):
2665
2666 ALLOWED_TYPES = ('processes',)
2667
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002668 def setUp(self):
2669 if not HAS_SHAREDCTYPES:
2670 self.skipTest("requires multiprocessing.sharedctypes")
2671
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002672 @classmethod
2673 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002674 x.value *= 2
2675 y.value *= 2
2676 foo.x *= 2
2677 foo.y *= 2
2678 string.value *= 2
2679 for i in range(len(arr)):
2680 arr[i] *= 2
2681
2682 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002683 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002684 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002685 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002686 arr = self.Array('d', list(range(10)), lock=lock)
2687 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002688 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002689
2690 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002691 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002692 p.start()
2693 p.join()
2694
2695 self.assertEqual(x.value, 14)
2696 self.assertAlmostEqual(y.value, 2.0/3.0)
2697 self.assertEqual(foo.x, 6)
2698 self.assertAlmostEqual(foo.y, 4.0)
2699 for i in range(10):
2700 self.assertAlmostEqual(arr[i], i*2)
2701 self.assertEqual(string.value, latin('hellohello'))
2702
2703 def test_synchronize(self):
2704 self.test_sharedctypes(lock=True)
2705
2706 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002707 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002708 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002709 foo.x = 0
2710 foo.y = 0
2711 self.assertEqual(bar.x, 2)
2712 self.assertAlmostEqual(bar.y, 5.0)
2713
2714#
2715#
2716#
2717
2718class _TestFinalize(BaseTestCase):
2719
2720 ALLOWED_TYPES = ('processes',)
2721
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002722 @classmethod
2723 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002724 class Foo(object):
2725 pass
2726
2727 a = Foo()
2728 util.Finalize(a, conn.send, args=('a',))
2729 del a # triggers callback for a
2730
2731 b = Foo()
2732 close_b = util.Finalize(b, conn.send, args=('b',))
2733 close_b() # triggers callback for b
2734 close_b() # does nothing because callback has already been called
2735 del b # does nothing because callback has already been called
2736
2737 c = Foo()
2738 util.Finalize(c, conn.send, args=('c',))
2739
2740 d10 = Foo()
2741 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2742
2743 d01 = Foo()
2744 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2745 d02 = Foo()
2746 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2747 d03 = Foo()
2748 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2749
2750 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2751
2752 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2753
Ezio Melotti13925002011-03-16 11:05:33 +02002754 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002755 # garbage collecting locals
2756 util._exit_function()
2757 conn.close()
2758 os._exit(0)
2759
2760 def test_finalize(self):
2761 conn, child_conn = self.Pipe()
2762
2763 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002764 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002765 p.start()
2766 p.join()
2767
2768 result = [obj for obj in iter(conn.recv, 'STOP')]
2769 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2770
2771#
2772# Test that from ... import * works for each module
2773#
2774
2775class _TestImportStar(BaseTestCase):
2776
2777 ALLOWED_TYPES = ('processes',)
2778
2779 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002780 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002781 'multiprocessing', 'multiprocessing.connection',
2782 'multiprocessing.heap', 'multiprocessing.managers',
2783 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002784 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002785 ]
2786
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002787 if HAS_REDUCTION:
2788 modules.append('multiprocessing.reduction')
2789
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002790 if c_int is not None:
2791 # This module requires _ctypes
2792 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002793
2794 for name in modules:
2795 __import__(name)
2796 mod = sys.modules[name]
2797
2798 for attr in getattr(mod, '__all__', ()):
2799 self.assertTrue(
2800 hasattr(mod, attr),
2801 '%r does not have attribute %r' % (mod, attr)
2802 )
2803
2804#
2805# Quick test that logging works -- does not test logging output
2806#
2807
2808class _TestLogging(BaseTestCase):
2809
2810 ALLOWED_TYPES = ('processes',)
2811
2812 def test_enable_logging(self):
2813 logger = multiprocessing.get_logger()
2814 logger.setLevel(util.SUBWARNING)
2815 self.assertTrue(logger is not None)
2816 logger.debug('this will not be printed')
2817 logger.info('nor will this')
2818 logger.setLevel(LOG_LEVEL)
2819
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002820 @classmethod
2821 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002822 logger = multiprocessing.get_logger()
2823 conn.send(logger.getEffectiveLevel())
2824
2825 def test_level(self):
2826 LEVEL1 = 32
2827 LEVEL2 = 37
2828
2829 logger = multiprocessing.get_logger()
2830 root_logger = logging.getLogger()
2831 root_level = root_logger.level
2832
2833 reader, writer = multiprocessing.Pipe(duplex=False)
2834
2835 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002836 p = self.Process(target=self._test_level, args=(writer,))
2837 p.daemon = True
2838 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002839 self.assertEqual(LEVEL1, reader.recv())
2840
2841 logger.setLevel(logging.NOTSET)
2842 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002843 p = self.Process(target=self._test_level, args=(writer,))
2844 p.daemon = True
2845 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002846 self.assertEqual(LEVEL2, reader.recv())
2847
2848 root_logger.setLevel(root_level)
2849 logger.setLevel(level=LOG_LEVEL)
2850
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002851
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002852# class _TestLoggingProcessName(BaseTestCase):
2853#
2854# def handle(self, record):
2855# assert record.processName == multiprocessing.current_process().name
2856# self.__handled = True
2857#
2858# def test_logging(self):
2859# handler = logging.Handler()
2860# handler.handle = self.handle
2861# self.__handled = False
2862# # Bypass getLogger() and side-effects
2863# logger = logging.getLoggerClass()(
2864# 'multiprocessing.test.TestLoggingProcessName')
2865# logger.addHandler(handler)
2866# logger.propagate = False
2867#
2868# logger.warn('foo')
2869# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002870
Benjamin Petersone711caf2008-06-11 16:44:04 +00002871#
Jesse Noller6214edd2009-01-19 16:23:53 +00002872# Test to verify handle verification, see issue 3321
2873#
2874
2875class TestInvalidHandle(unittest.TestCase):
2876
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002877 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002878 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002879 conn = multiprocessing.connection.Connection(44977608)
2880 try:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002881 self.assertRaises((ValueError, OSError), conn.poll)
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002882 finally:
2883 # Hack private attribute _handle to avoid printing an error
2884 # in conn.__del__
2885 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002886 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002887 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002888
Jesse Noller6214edd2009-01-19 16:23:53 +00002889#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002890# Functions used to create test cases from the base ones in this module
2891#
2892
Benjamin Petersone711caf2008-06-11 16:44:04 +00002893def create_test_cases(Mixin, type):
2894 result = {}
2895 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002896 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002897 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002898
2899 for name in list(glob.keys()):
2900 if name.startswith('_Test'):
2901 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002902 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002903 if type in base.ALLOWED_TYPES:
2904 newname = 'With' + Type + name[1:]
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002905 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002906 pass
2907 result[newname] = Temp
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002908 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002909 Temp.__module__ = Mixin.__module__
2910 return result
2911
2912#
2913# Create test cases
2914#
2915
2916class ProcessesMixin(object):
2917 TYPE = 'processes'
2918 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002919 connection = multiprocessing.connection
2920 current_process = staticmethod(multiprocessing.current_process)
2921 active_children = staticmethod(multiprocessing.active_children)
2922 Pool = staticmethod(multiprocessing.Pool)
2923 Pipe = staticmethod(multiprocessing.Pipe)
2924 Queue = staticmethod(multiprocessing.Queue)
2925 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2926 Lock = staticmethod(multiprocessing.Lock)
2927 RLock = staticmethod(multiprocessing.RLock)
2928 Semaphore = staticmethod(multiprocessing.Semaphore)
2929 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2930 Condition = staticmethod(multiprocessing.Condition)
2931 Event = staticmethod(multiprocessing.Event)
2932 Barrier = staticmethod(multiprocessing.Barrier)
2933 Value = staticmethod(multiprocessing.Value)
2934 Array = staticmethod(multiprocessing.Array)
2935 RawValue = staticmethod(multiprocessing.RawValue)
2936 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002937
2938testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2939globals().update(testcases_processes)
2940
2941
2942class ManagerMixin(object):
2943 TYPE = 'manager'
2944 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002945 Queue = property(operator.attrgetter('manager.Queue'))
2946 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
2947 Lock = property(operator.attrgetter('manager.Lock'))
2948 RLock = property(operator.attrgetter('manager.RLock'))
2949 Semaphore = property(operator.attrgetter('manager.Semaphore'))
2950 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
2951 Condition = property(operator.attrgetter('manager.Condition'))
2952 Event = property(operator.attrgetter('manager.Event'))
2953 Barrier = property(operator.attrgetter('manager.Barrier'))
2954 Value = property(operator.attrgetter('manager.Value'))
2955 Array = property(operator.attrgetter('manager.Array'))
2956 list = property(operator.attrgetter('manager.list'))
2957 dict = property(operator.attrgetter('manager.dict'))
2958 Namespace = property(operator.attrgetter('manager.Namespace'))
2959
2960 @classmethod
2961 def Pool(cls, *args, **kwds):
2962 return cls.manager.Pool(*args, **kwds)
2963
2964 @classmethod
2965 def setUpClass(cls):
2966 cls.manager = multiprocessing.Manager()
2967
2968 @classmethod
2969 def tearDownClass(cls):
2970 multiprocessing.active_children() # discard dead process objs
2971 gc.collect() # do garbage collection
2972 if cls.manager._number_of_objects() != 0:
2973 # This is not really an error since some tests do not
2974 # ensure that all processes which hold a reference to a
2975 # managed object have been joined.
2976 print('Shared objects which still exist at manager shutdown:')
2977 print(cls.manager._debug_info())
2978 cls.manager.shutdown()
2979 cls.manager.join()
2980 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00002981
2982testcases_manager = create_test_cases(ManagerMixin, type='manager')
2983globals().update(testcases_manager)
2984
2985
2986class ThreadsMixin(object):
2987 TYPE = 'threads'
2988 Process = multiprocessing.dummy.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002989 connection = multiprocessing.dummy.connection
2990 current_process = staticmethod(multiprocessing.dummy.current_process)
2991 active_children = staticmethod(multiprocessing.dummy.active_children)
2992 Pool = staticmethod(multiprocessing.Pool)
2993 Pipe = staticmethod(multiprocessing.dummy.Pipe)
2994 Queue = staticmethod(multiprocessing.dummy.Queue)
2995 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
2996 Lock = staticmethod(multiprocessing.dummy.Lock)
2997 RLock = staticmethod(multiprocessing.dummy.RLock)
2998 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
2999 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3000 Condition = staticmethod(multiprocessing.dummy.Condition)
3001 Event = staticmethod(multiprocessing.dummy.Event)
3002 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3003 Value = staticmethod(multiprocessing.dummy.Value)
3004 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003005
3006testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3007globals().update(testcases_threads)
3008
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003009
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003010class OtherTest(unittest.TestCase):
3011 # TODO: add more tests for deliver/answer challenge.
3012 def test_deliver_challenge_auth_failure(self):
3013 class _FakeConnection(object):
3014 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003015 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003016 def send_bytes(self, data):
3017 pass
3018 self.assertRaises(multiprocessing.AuthenticationError,
3019 multiprocessing.connection.deliver_challenge,
3020 _FakeConnection(), b'abc')
3021
3022 def test_answer_challenge_auth_failure(self):
3023 class _FakeConnection(object):
3024 def __init__(self):
3025 self.count = 0
3026 def recv_bytes(self, size):
3027 self.count += 1
3028 if self.count == 1:
3029 return multiprocessing.connection.CHALLENGE
3030 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003031 return b'something bogus'
3032 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003033 def send_bytes(self, data):
3034 pass
3035 self.assertRaises(multiprocessing.AuthenticationError,
3036 multiprocessing.connection.answer_challenge,
3037 _FakeConnection(), b'abc')
3038
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003039#
3040# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3041#
3042
3043def initializer(ns):
3044 ns.test += 1
3045
3046class TestInitializers(unittest.TestCase):
3047 def setUp(self):
3048 self.mgr = multiprocessing.Manager()
3049 self.ns = self.mgr.Namespace()
3050 self.ns.test = 0
3051
3052 def tearDown(self):
3053 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003054 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003055
3056 def test_manager_initializer(self):
3057 m = multiprocessing.managers.SyncManager()
3058 self.assertRaises(TypeError, m.start, 1)
3059 m.start(initializer, (self.ns,))
3060 self.assertEqual(self.ns.test, 1)
3061 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003062 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003063
3064 def test_pool_initializer(self):
3065 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3066 p = multiprocessing.Pool(1, initializer, (self.ns,))
3067 p.close()
3068 p.join()
3069 self.assertEqual(self.ns.test, 1)
3070
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003071#
3072# Issue 5155, 5313, 5331: Test process in processes
3073# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3074#
3075
3076def _ThisSubProcess(q):
3077 try:
3078 item = q.get(block=False)
3079 except pyqueue.Empty:
3080 pass
3081
3082def _TestProcess(q):
3083 queue = multiprocessing.Queue()
3084 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003085 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003086 subProc.start()
3087 subProc.join()
3088
3089def _afunc(x):
3090 return x*x
3091
3092def pool_in_process():
3093 pool = multiprocessing.Pool(processes=4)
3094 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003095 pool.close()
3096 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003097
3098class _file_like(object):
3099 def __init__(self, delegate):
3100 self._delegate = delegate
3101 self._pid = None
3102
3103 @property
3104 def cache(self):
3105 pid = os.getpid()
3106 # There are no race conditions since fork keeps only the running thread
3107 if pid != self._pid:
3108 self._pid = pid
3109 self._cache = []
3110 return self._cache
3111
3112 def write(self, data):
3113 self.cache.append(data)
3114
3115 def flush(self):
3116 self._delegate.write(''.join(self.cache))
3117 self._cache = []
3118
3119class TestStdinBadfiledescriptor(unittest.TestCase):
3120
3121 def test_queue_in_process(self):
3122 queue = multiprocessing.Queue()
3123 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3124 proc.start()
3125 proc.join()
3126
3127 def test_pool_in_process(self):
3128 p = multiprocessing.Process(target=pool_in_process)
3129 p.start()
3130 p.join()
3131
3132 def test_flushing(self):
3133 sio = io.StringIO()
3134 flike = _file_like(sio)
3135 flike.write('foo')
3136 proc = multiprocessing.Process(target=lambda: flike.flush())
3137 flike.flush()
3138 assert sio.getvalue() == 'foo'
3139
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003140
3141class TestWait(unittest.TestCase):
3142
3143 @classmethod
3144 def _child_test_wait(cls, w, slow):
3145 for i in range(10):
3146 if slow:
3147 time.sleep(random.random()*0.1)
3148 w.send((i, os.getpid()))
3149 w.close()
3150
3151 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003152 from multiprocessing.connection import wait
3153 readers = []
3154 procs = []
3155 messages = []
3156
3157 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003158 r, w = multiprocessing.Pipe(duplex=False)
3159 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003160 p.daemon = True
3161 p.start()
3162 w.close()
3163 readers.append(r)
3164 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003165 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003166
3167 while readers:
3168 for r in wait(readers):
3169 try:
3170 msg = r.recv()
3171 except EOFError:
3172 readers.remove(r)
3173 r.close()
3174 else:
3175 messages.append(msg)
3176
3177 messages.sort()
3178 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3179 self.assertEqual(messages, expected)
3180
3181 @classmethod
3182 def _child_test_wait_socket(cls, address, slow):
3183 s = socket.socket()
3184 s.connect(address)
3185 for i in range(10):
3186 if slow:
3187 time.sleep(random.random()*0.1)
3188 s.sendall(('%s\n' % i).encode('ascii'))
3189 s.close()
3190
3191 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003192 from multiprocessing.connection import wait
3193 l = socket.socket()
3194 l.bind(('', 0))
3195 l.listen(4)
3196 addr = ('localhost', l.getsockname()[1])
3197 readers = []
3198 procs = []
3199 dic = {}
3200
3201 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003202 p = multiprocessing.Process(target=self._child_test_wait_socket,
3203 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003204 p.daemon = True
3205 p.start()
3206 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003207 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003208
3209 for i in range(4):
3210 r, _ = l.accept()
3211 readers.append(r)
3212 dic[r] = []
3213 l.close()
3214
3215 while readers:
3216 for r in wait(readers):
3217 msg = r.recv(32)
3218 if not msg:
3219 readers.remove(r)
3220 r.close()
3221 else:
3222 dic[r].append(msg)
3223
3224 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3225 for v in dic.values():
3226 self.assertEqual(b''.join(v), expected)
3227
3228 def test_wait_slow(self):
3229 self.test_wait(True)
3230
3231 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003232 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003233
3234 def test_wait_timeout(self):
3235 from multiprocessing.connection import wait
3236
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003237 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003238 a, b = multiprocessing.Pipe()
3239
3240 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003241 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003242 delta = time.time() - start
3243
3244 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003245 self.assertLess(delta, expected * 2)
3246 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003247
3248 b.send(None)
3249
3250 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003251 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003252 delta = time.time() - start
3253
3254 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003255 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003256
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003257 @classmethod
3258 def signal_and_sleep(cls, sem, period):
3259 sem.release()
3260 time.sleep(period)
3261
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003262 def test_wait_integer(self):
3263 from multiprocessing.connection import wait
3264
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003265 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01003266 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003267 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003268 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003269 p = multiprocessing.Process(target=self.signal_and_sleep,
3270 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003271
3272 p.start()
3273 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003274 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003275
3276 start = time.time()
3277 res = wait([a, p.sentinel, b], expected + 20)
3278 delta = time.time() - start
3279
3280 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003281 self.assertLess(delta, expected + 2)
3282 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003283
3284 a.send(None)
3285
3286 start = time.time()
3287 res = wait([a, p.sentinel, b], 20)
3288 delta = time.time() - start
3289
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003290 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003291 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003292
3293 b.send(None)
3294
3295 start = time.time()
3296 res = wait([a, p.sentinel, b], 20)
3297 delta = time.time() - start
3298
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003299 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003300 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003301
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003302 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003303 p.join()
3304
Richard Oudkerk59d54042012-05-10 16:11:12 +01003305 def test_neg_timeout(self):
3306 from multiprocessing.connection import wait
3307 a, b = multiprocessing.Pipe()
3308 t = time.time()
3309 res = wait([a], timeout=-1)
3310 t = time.time() - t
3311 self.assertEqual(res, [])
3312 self.assertLess(t, 1)
3313 a.close()
3314 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003315
Antoine Pitrou709176f2012-04-01 17:19:09 +02003316#
3317# Issue 14151: Test invalid family on invalid environment
3318#
3319
3320class TestInvalidFamily(unittest.TestCase):
3321
3322 @unittest.skipIf(WIN32, "skipped on Windows")
3323 def test_invalid_family(self):
3324 with self.assertRaises(ValueError):
3325 multiprocessing.connection.Listener(r'\\.\test')
3326
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003327 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3328 def test_invalid_family_win32(self):
3329 with self.assertRaises(ValueError):
3330 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003331
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003332#
3333# Issue 12098: check sys.flags of child matches that for parent
3334#
3335
3336class TestFlags(unittest.TestCase):
3337 @classmethod
3338 def run_in_grandchild(cls, conn):
3339 conn.send(tuple(sys.flags))
3340
3341 @classmethod
3342 def run_in_child(cls):
3343 import json
3344 r, w = multiprocessing.Pipe(duplex=False)
3345 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3346 p.start()
3347 grandchild_flags = r.recv()
3348 p.join()
3349 r.close()
3350 w.close()
3351 flags = (tuple(sys.flags), grandchild_flags)
3352 print(json.dumps(flags))
3353
3354 def test_flags(self):
3355 import json, subprocess
3356 # start child process using unusual flags
3357 prog = ('from test.test_multiprocessing import TestFlags; ' +
3358 'TestFlags.run_in_child()')
3359 data = subprocess.check_output(
3360 [sys.executable, '-E', '-S', '-O', '-c', prog])
3361 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3362 self.assertEqual(child_flags, grandchild_flags)
3363
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003364#
3365# Test interaction with socket timeouts - see Issue #6056
3366#
3367
3368class TestTimeouts(unittest.TestCase):
3369 @classmethod
3370 def _test_timeout(cls, child, address):
3371 time.sleep(1)
3372 child.send(123)
3373 child.close()
3374 conn = multiprocessing.connection.Client(address)
3375 conn.send(456)
3376 conn.close()
3377
3378 def test_timeout(self):
3379 old_timeout = socket.getdefaulttimeout()
3380 try:
3381 socket.setdefaulttimeout(0.1)
3382 parent, child = multiprocessing.Pipe(duplex=True)
3383 l = multiprocessing.connection.Listener(family='AF_INET')
3384 p = multiprocessing.Process(target=self._test_timeout,
3385 args=(child, l.address))
3386 p.start()
3387 child.close()
3388 self.assertEqual(parent.recv(), 123)
3389 parent.close()
3390 conn = l.accept()
3391 self.assertEqual(conn.recv(), 456)
3392 conn.close()
3393 l.close()
3394 p.join(10)
3395 finally:
3396 socket.setdefaulttimeout(old_timeout)
3397
Richard Oudkerke88a2442012-08-14 11:41:32 +01003398#
3399# Test what happens with no "if __name__ == '__main__'"
3400#
3401
3402class TestNoForkBomb(unittest.TestCase):
3403 def test_noforkbomb(self):
3404 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3405 if WIN32:
3406 rc, out, err = test.script_helper.assert_python_failure(name)
3407 self.assertEqual('', out.decode('ascii'))
3408 self.assertIn('RuntimeError', err.decode('ascii'))
3409 else:
3410 rc, out, err = test.script_helper.assert_python_ok(name)
3411 self.assertEqual('123', out.decode('ascii').rstrip())
3412 self.assertEqual('', err.decode('ascii'))
3413
3414#
3415#
3416#
3417
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003418testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003419 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
Richard Oudkerk3165a752012-08-14 12:51:14 +01003420 TestFlags, TestTimeouts, TestNoForkBomb]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003421
Benjamin Petersone711caf2008-06-11 16:44:04 +00003422#
3423#
3424#
3425
3426def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003427 if sys.platform.startswith("linux"):
3428 try:
3429 lock = multiprocessing.RLock()
3430 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003431 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003432
Charles-François Natali221ef672011-11-22 18:55:22 +01003433 check_enough_semaphores()
3434
Benjamin Petersone711caf2008-06-11 16:44:04 +00003435 if run is None:
3436 from test.support import run_unittest as run
3437
3438 util.get_temp_dir() # creates temp directory for use by all processes
3439
3440 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3441
Benjamin Petersone711caf2008-06-11 16:44:04 +00003442 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003443 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3444 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003445 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3446 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003447 )
3448
3449 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3450 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003451 run(suite)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003452
3453def main():
3454 test_main(unittest.TextTestRunner(verbosity=2).run)
3455
3456if __name__ == '__main__':
3457 main()