blob: 576096959810c92923064d3a285288967d589c4c [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkd15642e2013-07-16 15:33:41 +010022import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000023import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010024import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000031# import threading after _multiprocessing to raise a more revelant error
32# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.dummy
37import multiprocessing.connection
38import multiprocessing.managers
39import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
46 HAS_REDUCTION = True
47except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000065def latin(s):
66 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
69# Constants
70#
71
72LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000073#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
75DELTA = 0.1
76CHECK_TIMINGS = False # making true makes tests take a lot longer
77 # and can sometimes cause some non-serious
78 # failures because some calls block a bit
79 # longer than expected
80if CHECK_TIMINGS:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82else:
83 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84
85HAVE_GETVALUE = not getattr(_multiprocessing,
86 'HAVE_BROKEN_SEM_GETVALUE', False)
87
Jesse Noller6214edd2009-01-19 16:23:53 +000088WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020091
Richard Oudkerk59d54042012-05-10 16:11:12 +010092def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
Charles-François Natali221ef672011-11-22 18:55:22 +0100112
113def check_enough_semaphores():
114 """Check that the system supports enough semaphores to run the test."""
115 # minimum number of semaphores available according to POSIX
116 nsems_min = 256
117 try:
118 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119 except (AttributeError, ValueError):
120 # sysconf not available or setting not available
121 return
122 if nsems == -1 or nsems >= nsems_min:
123 return
124 raise unittest.SkipTest("The OS doesn't support enough semaphores "
125 "to run the test (required: %d)." % nsems_min)
126
127
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129# Creates a wrapper for a function which records the time it takes to finish
130#
131
132class TimingWrapper(object):
133
134 def __init__(self, func):
135 self.func = func
136 self.elapsed = None
137
138 def __call__(self, *args, **kwds):
139 t = time.time()
140 try:
141 return self.func(*args, **kwds)
142 finally:
143 self.elapsed = time.time() - t
144
145#
146# Base class for test cases
147#
148
149class BaseTestCase(object):
150
151 ALLOWED_TYPES = ('processes', 'manager', 'threads')
152
153 def assertTimingAlmostEqual(self, a, b):
154 if CHECK_TIMINGS:
155 self.assertAlmostEqual(a, b, 1)
156
157 def assertReturnsIfImplemented(self, value, func, *args):
158 try:
159 res = func(*args)
160 except NotImplementedError:
161 pass
162 else:
163 return self.assertEqual(value, res)
164
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000165 # For the sanity of Windows users, rather than crashing or freezing in
166 # multiple ways.
167 def __reduce__(self, *args):
168 raise NotImplementedError("shouldn't try to pickle a test case")
169
170 __reduce_ex__ = __reduce__
171
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172#
173# Return the value of a semaphore
174#
175
176def get_value(self):
177 try:
178 return self.get_value()
179 except AttributeError:
180 try:
181 return self._Semaphore__value
182 except AttributeError:
183 try:
184 return self._value
185 except AttributeError:
186 raise NotImplementedError
187
188#
189# Testcases
190#
191
192class _TestProcess(BaseTestCase):
193
194 ALLOWED_TYPES = ('processes', 'threads')
195
196 def test_current(self):
197 if self.TYPE == 'threads':
198 return
199
200 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000205 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(current.ident, os.getpid())
208 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000210 def test_daemon_argument(self):
211 if self.TYPE == "threads":
212 return
213
214 # By default uses the current process's daemon flag.
215 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000216 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 proc1 = self.Process(target=self._test, daemon=True)
218 self.assertTrue(proc1.daemon)
219 proc2 = self.Process(target=self._test, daemon=False)
220 self.assertFalse(proc2.daemon)
221
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000222 @classmethod
223 def _test(cls, q, *args, **kwds):
224 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 q.put(args)
226 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 q.put(current.pid)
231
232 def test_process(self):
233 q = self.Queue(1)
234 e = self.Event()
235 args = (q, 1, 2)
236 kwargs = {'hello':23, 'bye':2.54}
237 name = 'SomeProcess'
238 p = self.Process(
239 target=self._test, args=args, kwargs=kwargs, name=name
240 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 current = self.current_process()
243
244 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.authkey, current.authkey)
246 self.assertEqual(p.is_alive(), False)
247 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000250 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.start()
253
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(p.exitcode, None)
255 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000256 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(q.get(), args[1:])
259 self.assertEqual(q.get(), kwargs)
260 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(q.get(), current.authkey)
263 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.join()
266
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(p.exitcode, 0)
268 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 @classmethod
272 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 time.sleep(1000)
274
275 def test_terminate(self):
276 if self.TYPE == 'threads':
277 return
278
279 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.start()
282
283 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000285 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286
Richard Oudkerk59d54042012-05-10 16:11:12 +0100287 join = TimingWrapper(p.join)
288
289 self.assertEqual(join(0), None)
290 self.assertTimingAlmostEqual(join.elapsed, 0.0)
291 self.assertEqual(p.is_alive(), True)
292
293 self.assertEqual(join(-1), None)
294 self.assertTimingAlmostEqual(join.elapsed, 0.0)
295 self.assertEqual(p.is_alive(), True)
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 p.terminate()
298
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 self.assertEqual(join(), None)
300 self.assertTimingAlmostEqual(join.elapsed, 0.0)
301
302 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
305 p.join()
306
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000307 # XXX sometimes get p.exitcode == 0 on Windows ...
308 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309
310 def test_cpu_count(self):
311 try:
312 cpus = multiprocessing.cpu_count()
313 except NotImplementedError:
314 cpus = 1
315 self.assertTrue(type(cpus) is int)
316 self.assertTrue(cpus >= 1)
317
318 def test_active_children(self):
319 self.assertEqual(type(self.active_children()), list)
320
321 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000322 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323
Jesus Cea94f964f2011-09-09 20:26:57 +0200324 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000329 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000331 @classmethod
332 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 from multiprocessing import forking
334 wconn.send(id)
335 if len(id) < 2:
336 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000337 p = cls.Process(
338 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339 )
340 p.start()
341 p.join()
342
343 def test_recursion(self):
344 rconn, wconn = self.Pipe(duplex=False)
345 self._test_recursion(wconn, [])
346
347 time.sleep(DELTA)
348 result = []
349 while rconn.poll():
350 result.append(rconn.recv())
351
352 expected = [
353 [],
354 [0],
355 [0, 0],
356 [0, 1],
357 [1],
358 [1, 0],
359 [1, 1]
360 ]
361 self.assertEqual(result, expected)
362
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200363 @classmethod
364 def _test_sentinel(cls, event):
365 event.wait(10.0)
366
367 def test_sentinel(self):
368 if self.TYPE == "threads":
369 return
370 event = self.Event()
371 p = self.Process(target=self._test_sentinel, args=(event,))
372 with self.assertRaises(ValueError):
373 p.sentinel
374 p.start()
375 self.addCleanup(p.join)
376 sentinel = p.sentinel
377 self.assertIsInstance(sentinel, int)
378 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
379 event.set()
380 p.join()
381 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
382
Benjamin Petersone711caf2008-06-11 16:44:04 +0000383#
384#
385#
386
387class _UpperCaser(multiprocessing.Process):
388
389 def __init__(self):
390 multiprocessing.Process.__init__(self)
391 self.child_conn, self.parent_conn = multiprocessing.Pipe()
392
393 def run(self):
394 self.parent_conn.close()
395 for s in iter(self.child_conn.recv, None):
396 self.child_conn.send(s.upper())
397 self.child_conn.close()
398
399 def submit(self, s):
400 assert type(s) is str
401 self.parent_conn.send(s)
402 return self.parent_conn.recv()
403
404 def stop(self):
405 self.parent_conn.send(None)
406 self.parent_conn.close()
407 self.child_conn.close()
408
409class _TestSubclassingProcess(BaseTestCase):
410
411 ALLOWED_TYPES = ('processes',)
412
413 def test_subclassing(self):
414 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200415 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000416 uppercaser.start()
417 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
418 self.assertEqual(uppercaser.submit('world'), 'WORLD')
419 uppercaser.stop()
420 uppercaser.join()
421
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100422 def test_stderr_flush(self):
423 # sys.stderr is flushed at process shutdown (issue #13812)
424 if self.TYPE == "threads":
425 return
426
427 testfn = test.support.TESTFN
428 self.addCleanup(test.support.unlink, testfn)
429 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
430 proc.start()
431 proc.join()
432 with open(testfn, 'r') as f:
433 err = f.read()
434 # The whole traceback was printed
435 self.assertIn("ZeroDivisionError", err)
436 self.assertIn("test_multiprocessing.py", err)
437 self.assertIn("1/0 # MARKER", err)
438
439 @classmethod
440 def _test_stderr_flush(cls, testfn):
441 sys.stderr = open(testfn, 'w')
442 1/0 # MARKER
443
444
Richard Oudkerk29471de2012-06-06 19:04:57 +0100445 @classmethod
446 def _test_sys_exit(cls, reason, testfn):
447 sys.stderr = open(testfn, 'w')
448 sys.exit(reason)
449
450 def test_sys_exit(self):
451 # See Issue 13854
452 if self.TYPE == 'threads':
453 return
454
455 testfn = test.support.TESTFN
456 self.addCleanup(test.support.unlink, testfn)
457
458 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
459 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
460 p.daemon = True
461 p.start()
462 p.join(5)
463 self.assertEqual(p.exitcode, code)
464
465 with open(testfn, 'r') as f:
466 self.assertEqual(f.read().rstrip(), str(reason))
467
468 for reason in (True, False, 8):
469 p = self.Process(target=sys.exit, args=(reason,))
470 p.daemon = True
471 p.start()
472 p.join(5)
473 self.assertEqual(p.exitcode, reason)
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475#
476#
477#
478
479def queue_empty(q):
480 if hasattr(q, 'empty'):
481 return q.empty()
482 else:
483 return q.qsize() == 0
484
485def queue_full(q, maxsize):
486 if hasattr(q, 'full'):
487 return q.full()
488 else:
489 return q.qsize() == maxsize
490
491
492class _TestQueue(BaseTestCase):
493
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 child_can_start.wait()
498 for i in range(6):
499 queue.get()
500 parent_can_continue.set()
501
502 def test_put(self):
503 MAXSIZE = 6
504 queue = self.Queue(maxsize=MAXSIZE)
505 child_can_start = self.Event()
506 parent_can_continue = self.Event()
507
508 proc = self.Process(
509 target=self._test_put,
510 args=(queue, child_can_start, parent_can_continue)
511 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000512 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 proc.start()
514
515 self.assertEqual(queue_empty(queue), True)
516 self.assertEqual(queue_full(queue, MAXSIZE), False)
517
518 queue.put(1)
519 queue.put(2, True)
520 queue.put(3, True, None)
521 queue.put(4, False)
522 queue.put(5, False, None)
523 queue.put_nowait(6)
524
525 # the values may be in buffer but not yet in pipe so sleep a bit
526 time.sleep(DELTA)
527
528 self.assertEqual(queue_empty(queue), False)
529 self.assertEqual(queue_full(queue, MAXSIZE), True)
530
531 put = TimingWrapper(queue.put)
532 put_nowait = TimingWrapper(queue.put_nowait)
533
534 self.assertRaises(pyqueue.Full, put, 7, False)
535 self.assertTimingAlmostEqual(put.elapsed, 0)
536
537 self.assertRaises(pyqueue.Full, put, 7, False, None)
538 self.assertTimingAlmostEqual(put.elapsed, 0)
539
540 self.assertRaises(pyqueue.Full, put_nowait, 7)
541 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
542
543 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
544 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
545
546 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
547 self.assertTimingAlmostEqual(put.elapsed, 0)
548
549 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
550 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 self.assertEqual(queue_empty(queue), True)
556 self.assertEqual(queue_full(queue, MAXSIZE), False)
557
558 proc.join()
559
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000560 @classmethod
561 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000563 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000564 queue.put(2)
565 queue.put(3)
566 queue.put(4)
567 queue.put(5)
568 parent_can_continue.set()
569
570 def test_get(self):
571 queue = self.Queue()
572 child_can_start = self.Event()
573 parent_can_continue = self.Event()
574
575 proc = self.Process(
576 target=self._test_get,
577 args=(queue, child_can_start, parent_can_continue)
578 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000579 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 proc.start()
581
582 self.assertEqual(queue_empty(queue), True)
583
584 child_can_start.set()
585 parent_can_continue.wait()
586
587 time.sleep(DELTA)
588 self.assertEqual(queue_empty(queue), False)
589
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000590 # Hangs unexpectedly, remove for now
591 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 self.assertEqual(queue.get(True, None), 2)
593 self.assertEqual(queue.get(True), 3)
594 self.assertEqual(queue.get(timeout=1), 4)
595 self.assertEqual(queue.get_nowait(), 5)
596
597 self.assertEqual(queue_empty(queue), True)
598
599 get = TimingWrapper(queue.get)
600 get_nowait = TimingWrapper(queue.get_nowait)
601
602 self.assertRaises(pyqueue.Empty, get, False)
603 self.assertTimingAlmostEqual(get.elapsed, 0)
604
605 self.assertRaises(pyqueue.Empty, get, False, None)
606 self.assertTimingAlmostEqual(get.elapsed, 0)
607
608 self.assertRaises(pyqueue.Empty, get_nowait)
609 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
610
611 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
612 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
613
614 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
615 self.assertTimingAlmostEqual(get.elapsed, 0)
616
617 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
618 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
619
620 proc.join()
621
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000622 @classmethod
623 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000624 for i in range(10, 20):
625 queue.put(i)
626 # note that at this point the items may only be buffered, so the
627 # process cannot shutdown until the feeder thread has finished
628 # pushing items onto the pipe.
629
630 def test_fork(self):
631 # Old versions of Queue would fail to create a new feeder
632 # thread for a forked process if the original process had its
633 # own feeder thread. This test checks that this no longer
634 # happens.
635
636 queue = self.Queue()
637
638 # put items on queue so that main process starts a feeder thread
639 for i in range(10):
640 queue.put(i)
641
642 # wait to make sure thread starts before we fork a new process
643 time.sleep(DELTA)
644
645 # fork process
646 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200647 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000648 p.start()
649
650 # check that all expected items are in the queue
651 for i in range(20):
652 self.assertEqual(queue.get(), i)
653 self.assertRaises(pyqueue.Empty, queue.get, False)
654
655 p.join()
656
657 def test_qsize(self):
658 q = self.Queue()
659 try:
660 self.assertEqual(q.qsize(), 0)
661 except NotImplementedError:
662 return
663 q.put(1)
664 self.assertEqual(q.qsize(), 1)
665 q.put(5)
666 self.assertEqual(q.qsize(), 2)
667 q.get()
668 self.assertEqual(q.qsize(), 1)
669 q.get()
670 self.assertEqual(q.qsize(), 0)
671
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000672 @classmethod
673 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 for obj in iter(q.get, None):
675 time.sleep(DELTA)
676 q.task_done()
677
678 def test_task_done(self):
679 queue = self.JoinableQueue()
680
681 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000682 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683
684 workers = [self.Process(target=self._test_task_done, args=(queue,))
685 for i in range(4)]
686
687 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200688 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 p.start()
690
691 for i in range(10):
692 queue.put(i)
693
694 queue.join()
695
696 for p in workers:
697 queue.put(None)
698
699 for p in workers:
700 p.join()
701
Giampaolo Rodola'b38897f2013-04-17 13:08:59 +0200702 def test_timeout(self):
703 q = multiprocessing.Queue()
704 start = time.time()
705 self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
706 delta = time.time() - start
707 self.assertGreaterEqual(delta, 0.19)
708
Benjamin Petersone711caf2008-06-11 16:44:04 +0000709#
710#
711#
712
713class _TestLock(BaseTestCase):
714
715 def test_lock(self):
716 lock = self.Lock()
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.acquire(False), False)
719 self.assertEqual(lock.release(), None)
720 self.assertRaises((ValueError, threading.ThreadError), lock.release)
721
722 def test_rlock(self):
723 lock = self.RLock()
724 self.assertEqual(lock.acquire(), True)
725 self.assertEqual(lock.acquire(), True)
726 self.assertEqual(lock.acquire(), True)
727 self.assertEqual(lock.release(), None)
728 self.assertEqual(lock.release(), None)
729 self.assertEqual(lock.release(), None)
730 self.assertRaises((AssertionError, RuntimeError), lock.release)
731
Jesse Nollerf8d00852009-03-31 03:25:07 +0000732 def test_lock_context(self):
733 with self.Lock():
734 pass
735
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736
737class _TestSemaphore(BaseTestCase):
738
739 def _test_semaphore(self, sem):
740 self.assertReturnsIfImplemented(2, get_value, sem)
741 self.assertEqual(sem.acquire(), True)
742 self.assertReturnsIfImplemented(1, get_value, sem)
743 self.assertEqual(sem.acquire(), True)
744 self.assertReturnsIfImplemented(0, get_value, sem)
745 self.assertEqual(sem.acquire(False), False)
746 self.assertReturnsIfImplemented(0, get_value, sem)
747 self.assertEqual(sem.release(), None)
748 self.assertReturnsIfImplemented(1, get_value, sem)
749 self.assertEqual(sem.release(), None)
750 self.assertReturnsIfImplemented(2, get_value, sem)
751
752 def test_semaphore(self):
753 sem = self.Semaphore(2)
754 self._test_semaphore(sem)
755 self.assertEqual(sem.release(), None)
756 self.assertReturnsIfImplemented(3, get_value, sem)
757 self.assertEqual(sem.release(), None)
758 self.assertReturnsIfImplemented(4, get_value, sem)
759
760 def test_bounded_semaphore(self):
761 sem = self.BoundedSemaphore(2)
762 self._test_semaphore(sem)
763 # Currently fails on OS/X
764 #if HAVE_GETVALUE:
765 # self.assertRaises(ValueError, sem.release)
766 # self.assertReturnsIfImplemented(2, get_value, sem)
767
768 def test_timeout(self):
769 if self.TYPE != 'processes':
770 return
771
772 sem = self.Semaphore(0)
773 acquire = TimingWrapper(sem.acquire)
774
775 self.assertEqual(acquire(False), False)
776 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
777
778 self.assertEqual(acquire(False, None), False)
779 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
780
781 self.assertEqual(acquire(False, TIMEOUT1), False)
782 self.assertTimingAlmostEqual(acquire.elapsed, 0)
783
784 self.assertEqual(acquire(True, TIMEOUT2), False)
785 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
786
787 self.assertEqual(acquire(timeout=TIMEOUT3), False)
788 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
789
790
791class _TestCondition(BaseTestCase):
792
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000793 @classmethod
794 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000795 cond.acquire()
796 sleeping.release()
797 cond.wait(timeout)
798 woken.release()
799 cond.release()
800
801 def check_invariant(self, cond):
802 # this is only supposed to succeed when there are no sleepers
803 if self.TYPE == 'processes':
804 try:
805 sleepers = (cond._sleeping_count.get_value() -
806 cond._woken_count.get_value())
807 self.assertEqual(sleepers, 0)
808 self.assertEqual(cond._wait_semaphore.get_value(), 0)
809 except NotImplementedError:
810 pass
811
812 def test_notify(self):
813 cond = self.Condition()
814 sleeping = self.Semaphore(0)
815 woken = self.Semaphore(0)
816
817 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000818 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000819 p.start()
820
821 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000822 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 p.start()
824
825 # wait for both children to start sleeping
826 sleeping.acquire()
827 sleeping.acquire()
828
829 # check no process/thread has woken up
830 time.sleep(DELTA)
831 self.assertReturnsIfImplemented(0, get_value, woken)
832
833 # wake up one process/thread
834 cond.acquire()
835 cond.notify()
836 cond.release()
837
838 # check one process/thread has woken up
839 time.sleep(DELTA)
840 self.assertReturnsIfImplemented(1, get_value, woken)
841
842 # wake up another
843 cond.acquire()
844 cond.notify()
845 cond.release()
846
847 # check other has woken up
848 time.sleep(DELTA)
849 self.assertReturnsIfImplemented(2, get_value, woken)
850
851 # check state is not mucked up
852 self.check_invariant(cond)
853 p.join()
854
855 def test_notify_all(self):
856 cond = self.Condition()
857 sleeping = self.Semaphore(0)
858 woken = self.Semaphore(0)
859
860 # start some threads/processes which will timeout
861 for i in range(3):
862 p = self.Process(target=self.f,
863 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000864 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865 p.start()
866
867 t = threading.Thread(target=self.f,
868 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000869 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870 t.start()
871
872 # wait for them all to sleep
873 for i in range(6):
874 sleeping.acquire()
875
876 # check they have all timed out
877 for i in range(6):
878 woken.acquire()
879 self.assertReturnsIfImplemented(0, get_value, woken)
880
881 # check state is not mucked up
882 self.check_invariant(cond)
883
884 # start some more threads/processes
885 for i in range(3):
886 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000887 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 p.start()
889
890 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000891 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 t.start()
893
894 # wait for them to all sleep
895 for i in range(6):
896 sleeping.acquire()
897
898 # check no process/thread has woken up
899 time.sleep(DELTA)
900 self.assertReturnsIfImplemented(0, get_value, woken)
901
902 # wake them all up
903 cond.acquire()
904 cond.notify_all()
905 cond.release()
906
907 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200908 for i in range(10):
909 try:
910 if get_value(woken) == 6:
911 break
912 except NotImplementedError:
913 break
914 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000915 self.assertReturnsIfImplemented(6, get_value, woken)
916
917 # check state is not mucked up
918 self.check_invariant(cond)
919
920 def test_timeout(self):
921 cond = self.Condition()
922 wait = TimingWrapper(cond.wait)
923 cond.acquire()
924 res = wait(TIMEOUT1)
925 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000926 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
928
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200929 @classmethod
930 def _test_waitfor_f(cls, cond, state):
931 with cond:
932 state.value = 0
933 cond.notify()
934 result = cond.wait_for(lambda : state.value==4)
935 if not result or state.value != 4:
936 sys.exit(1)
937
938 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
939 def test_waitfor(self):
940 # based on test in test/lock_tests.py
941 cond = self.Condition()
942 state = self.Value('i', -1)
943
944 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
945 p.daemon = True
946 p.start()
947
948 with cond:
949 result = cond.wait_for(lambda : state.value==0)
950 self.assertTrue(result)
951 self.assertEqual(state.value, 0)
952
953 for i in range(4):
954 time.sleep(0.01)
955 with cond:
956 state.value += 1
957 cond.notify()
958
959 p.join(5)
960 self.assertFalse(p.is_alive())
961 self.assertEqual(p.exitcode, 0)
962
963 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100964 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
965 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200966 with cond:
967 expected = 0.1
968 dt = time.time()
969 result = cond.wait_for(lambda : state.value==4, timeout=expected)
970 dt = time.time() - dt
971 # borrow logic in assertTimeout() from test/lock_tests.py
972 if not result and expected * 0.6 < dt < expected * 10.0:
973 success.value = True
974
975 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
976 def test_waitfor_timeout(self):
977 # based on test in test/lock_tests.py
978 cond = self.Condition()
979 state = self.Value('i', 0)
980 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100981 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200982
983 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100984 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200985 p.daemon = True
986 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100987 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200988
989 # Only increment 3 times, so state == 4 is never reached.
990 for i in range(3):
991 time.sleep(0.01)
992 with cond:
993 state.value += 1
994 cond.notify()
995
996 p.join(5)
997 self.assertTrue(success.value)
998
Richard Oudkerk98449932012-06-05 13:15:29 +0100999 @classmethod
1000 def _test_wait_result(cls, c, pid):
1001 with c:
1002 c.notify()
1003 time.sleep(1)
1004 if pid is not None:
1005 os.kill(pid, signal.SIGINT)
1006
1007 def test_wait_result(self):
1008 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1009 pid = os.getpid()
1010 else:
1011 pid = None
1012
1013 c = self.Condition()
1014 with c:
1015 self.assertFalse(c.wait(0))
1016 self.assertFalse(c.wait(0.1))
1017
1018 p = self.Process(target=self._test_wait_result, args=(c, pid))
1019 p.start()
1020
1021 self.assertTrue(c.wait(10))
1022 if pid is not None:
1023 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1024
1025 p.join()
1026
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027
1028class _TestEvent(BaseTestCase):
1029
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001030 @classmethod
1031 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032 time.sleep(TIMEOUT2)
1033 event.set()
1034
1035 def test_event(self):
1036 event = self.Event()
1037 wait = TimingWrapper(event.wait)
1038
Ezio Melotti13925002011-03-16 11:05:33 +02001039 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001040 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001041 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001042
Benjamin Peterson965ce872009-04-05 21:24:58 +00001043 # Removed, threading.Event.wait() will return the value of the __flag
1044 # instead of None. API Shear with the semaphore backed mp.Event
1045 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001046 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001047 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1049
1050 event.set()
1051
1052 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001053 self.assertEqual(event.is_set(), True)
1054 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001055 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001056 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001057 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1058 # self.assertEqual(event.is_set(), True)
1059
1060 event.clear()
1061
1062 #self.assertEqual(event.is_set(), False)
1063
Jesus Cea94f964f2011-09-09 20:26:57 +02001064 p = self.Process(target=self._test_event, args=(event,))
1065 p.daemon = True
1066 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001067 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068
1069#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001070# Tests for Barrier - adapted from tests in test/lock_tests.py
1071#
1072
1073# Many of the tests for threading.Barrier use a list as an atomic
1074# counter: a value is appended to increment the counter, and the
1075# length of the list gives the value. We use the class DummyList
1076# for the same purpose.
1077
1078class _DummyList(object):
1079
1080 def __init__(self):
1081 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1082 lock = multiprocessing.Lock()
1083 self.__setstate__((wrapper, lock))
1084 self._lengthbuf[0] = 0
1085
1086 def __setstate__(self, state):
1087 (self._wrapper, self._lock) = state
1088 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1089
1090 def __getstate__(self):
1091 return (self._wrapper, self._lock)
1092
1093 def append(self, _):
1094 with self._lock:
1095 self._lengthbuf[0] += 1
1096
1097 def __len__(self):
1098 with self._lock:
1099 return self._lengthbuf[0]
1100
1101def _wait():
1102 # A crude wait/yield function not relying on synchronization primitives.
1103 time.sleep(0.01)
1104
1105
1106class Bunch(object):
1107 """
1108 A bunch of threads.
1109 """
1110 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1111 """
1112 Construct a bunch of `n` threads running the same function `f`.
1113 If `wait_before_exit` is True, the threads won't terminate until
1114 do_finish() is called.
1115 """
1116 self.f = f
1117 self.args = args
1118 self.n = n
1119 self.started = namespace.DummyList()
1120 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001121 self._can_exit = namespace.Event()
1122 if not wait_before_exit:
1123 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001124 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001125 p = namespace.Process(target=self.task)
1126 p.daemon = True
1127 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001128
1129 def task(self):
1130 pid = os.getpid()
1131 self.started.append(pid)
1132 try:
1133 self.f(*self.args)
1134 finally:
1135 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001136 self._can_exit.wait(30)
1137 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001138
1139 def wait_for_started(self):
1140 while len(self.started) < self.n:
1141 _wait()
1142
1143 def wait_for_finished(self):
1144 while len(self.finished) < self.n:
1145 _wait()
1146
1147 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001148 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001149
1150
1151class AppendTrue(object):
1152 def __init__(self, obj):
1153 self.obj = obj
1154 def __call__(self):
1155 self.obj.append(True)
1156
1157
1158class _TestBarrier(BaseTestCase):
1159 """
1160 Tests for Barrier objects.
1161 """
1162 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001163 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001164
1165 def setUp(self):
1166 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1167
1168 def tearDown(self):
1169 self.barrier.abort()
1170 self.barrier = None
1171
1172 def DummyList(self):
1173 if self.TYPE == 'threads':
1174 return []
1175 elif self.TYPE == 'manager':
1176 return self.manager.list()
1177 else:
1178 return _DummyList()
1179
1180 def run_threads(self, f, args):
1181 b = Bunch(self, f, args, self.N-1)
1182 f(*args)
1183 b.wait_for_finished()
1184
1185 @classmethod
1186 def multipass(cls, barrier, results, n):
1187 m = barrier.parties
1188 assert m == cls.N
1189 for i in range(n):
1190 results[0].append(True)
1191 assert len(results[1]) == i * m
1192 barrier.wait()
1193 results[1].append(True)
1194 assert len(results[0]) == (i + 1) * m
1195 barrier.wait()
1196 try:
1197 assert barrier.n_waiting == 0
1198 except NotImplementedError:
1199 pass
1200 assert not barrier.broken
1201
1202 def test_barrier(self, passes=1):
1203 """
1204 Test that a barrier is passed in lockstep
1205 """
1206 results = [self.DummyList(), self.DummyList()]
1207 self.run_threads(self.multipass, (self.barrier, results, passes))
1208
1209 def test_barrier_10(self):
1210 """
1211 Test that a barrier works for 10 consecutive runs
1212 """
1213 return self.test_barrier(10)
1214
1215 @classmethod
1216 def _test_wait_return_f(cls, barrier, queue):
1217 res = barrier.wait()
1218 queue.put(res)
1219
1220 def test_wait_return(self):
1221 """
1222 test the return value from barrier.wait
1223 """
1224 queue = self.Queue()
1225 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1226 results = [queue.get() for i in range(self.N)]
1227 self.assertEqual(results.count(0), 1)
1228
1229 @classmethod
1230 def _test_action_f(cls, barrier, results):
1231 barrier.wait()
1232 if len(results) != 1:
1233 raise RuntimeError
1234
1235 def test_action(self):
1236 """
1237 Test the 'action' callback
1238 """
1239 results = self.DummyList()
1240 barrier = self.Barrier(self.N, action=AppendTrue(results))
1241 self.run_threads(self._test_action_f, (barrier, results))
1242 self.assertEqual(len(results), 1)
1243
1244 @classmethod
1245 def _test_abort_f(cls, barrier, results1, results2):
1246 try:
1247 i = barrier.wait()
1248 if i == cls.N//2:
1249 raise RuntimeError
1250 barrier.wait()
1251 results1.append(True)
1252 except threading.BrokenBarrierError:
1253 results2.append(True)
1254 except RuntimeError:
1255 barrier.abort()
1256
1257 def test_abort(self):
1258 """
1259 Test that an abort will put the barrier in a broken state
1260 """
1261 results1 = self.DummyList()
1262 results2 = self.DummyList()
1263 self.run_threads(self._test_abort_f,
1264 (self.barrier, results1, results2))
1265 self.assertEqual(len(results1), 0)
1266 self.assertEqual(len(results2), self.N-1)
1267 self.assertTrue(self.barrier.broken)
1268
1269 @classmethod
1270 def _test_reset_f(cls, barrier, results1, results2, results3):
1271 i = barrier.wait()
1272 if i == cls.N//2:
1273 # Wait until the other threads are all in the barrier.
1274 while barrier.n_waiting < cls.N-1:
1275 time.sleep(0.001)
1276 barrier.reset()
1277 else:
1278 try:
1279 barrier.wait()
1280 results1.append(True)
1281 except threading.BrokenBarrierError:
1282 results2.append(True)
1283 # Now, pass the barrier again
1284 barrier.wait()
1285 results3.append(True)
1286
1287 def test_reset(self):
1288 """
1289 Test that a 'reset' on a barrier frees the waiting threads
1290 """
1291 results1 = self.DummyList()
1292 results2 = self.DummyList()
1293 results3 = self.DummyList()
1294 self.run_threads(self._test_reset_f,
1295 (self.barrier, results1, results2, results3))
1296 self.assertEqual(len(results1), 0)
1297 self.assertEqual(len(results2), self.N-1)
1298 self.assertEqual(len(results3), self.N)
1299
1300 @classmethod
1301 def _test_abort_and_reset_f(cls, barrier, barrier2,
1302 results1, results2, results3):
1303 try:
1304 i = barrier.wait()
1305 if i == cls.N//2:
1306 raise RuntimeError
1307 barrier.wait()
1308 results1.append(True)
1309 except threading.BrokenBarrierError:
1310 results2.append(True)
1311 except RuntimeError:
1312 barrier.abort()
1313 # Synchronize and reset the barrier. Must synchronize first so
1314 # that everyone has left it when we reset, and after so that no
1315 # one enters it before the reset.
1316 if barrier2.wait() == cls.N//2:
1317 barrier.reset()
1318 barrier2.wait()
1319 barrier.wait()
1320 results3.append(True)
1321
1322 def test_abort_and_reset(self):
1323 """
1324 Test that a barrier can be reset after being broken.
1325 """
1326 results1 = self.DummyList()
1327 results2 = self.DummyList()
1328 results3 = self.DummyList()
1329 barrier2 = self.Barrier(self.N)
1330
1331 self.run_threads(self._test_abort_and_reset_f,
1332 (self.barrier, barrier2, results1, results2, results3))
1333 self.assertEqual(len(results1), 0)
1334 self.assertEqual(len(results2), self.N-1)
1335 self.assertEqual(len(results3), self.N)
1336
1337 @classmethod
1338 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001339 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001340 if i == cls.N//2:
1341 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001342 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001343 try:
1344 barrier.wait(0.5)
1345 except threading.BrokenBarrierError:
1346 results.append(True)
1347
1348 def test_timeout(self):
1349 """
1350 Test wait(timeout)
1351 """
1352 results = self.DummyList()
1353 self.run_threads(self._test_timeout_f, (self.barrier, results))
1354 self.assertEqual(len(results), self.barrier.parties)
1355
1356 @classmethod
1357 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001358 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001359 if i == cls.N//2:
1360 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001361 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001362 try:
1363 barrier.wait()
1364 except threading.BrokenBarrierError:
1365 results.append(True)
1366
1367 def test_default_timeout(self):
1368 """
1369 Test the barrier's default timeout
1370 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001371 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001372 results = self.DummyList()
1373 self.run_threads(self._test_default_timeout_f, (barrier, results))
1374 self.assertEqual(len(results), barrier.parties)
1375
1376 def test_single_thread(self):
1377 b = self.Barrier(1)
1378 b.wait()
1379 b.wait()
1380
1381 @classmethod
1382 def _test_thousand_f(cls, barrier, passes, conn, lock):
1383 for i in range(passes):
1384 barrier.wait()
1385 with lock:
1386 conn.send(i)
1387
1388 def test_thousand(self):
1389 if self.TYPE == 'manager':
1390 return
1391 passes = 1000
1392 lock = self.Lock()
1393 conn, child_conn = self.Pipe(False)
1394 for j in range(self.N):
1395 p = self.Process(target=self._test_thousand_f,
1396 args=(self.barrier, passes, child_conn, lock))
1397 p.start()
1398
1399 for i in range(passes):
1400 for j in range(self.N):
1401 self.assertEqual(conn.recv(), i)
1402
1403#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001404#
1405#
1406
1407class _TestValue(BaseTestCase):
1408
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001409 ALLOWED_TYPES = ('processes',)
1410
Benjamin Petersone711caf2008-06-11 16:44:04 +00001411 codes_values = [
1412 ('i', 4343, 24234),
1413 ('d', 3.625, -4.25),
1414 ('h', -232, 234),
1415 ('c', latin('x'), latin('y'))
1416 ]
1417
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001418 def setUp(self):
1419 if not HAS_SHAREDCTYPES:
1420 self.skipTest("requires multiprocessing.sharedctypes")
1421
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001422 @classmethod
1423 def _test(cls, values):
1424 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001425 sv.value = cv[2]
1426
1427
1428 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001429 if raw:
1430 values = [self.RawValue(code, value)
1431 for code, value, _ in self.codes_values]
1432 else:
1433 values = [self.Value(code, value)
1434 for code, value, _ in self.codes_values]
1435
1436 for sv, cv in zip(values, self.codes_values):
1437 self.assertEqual(sv.value, cv[1])
1438
1439 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001441 proc.start()
1442 proc.join()
1443
1444 for sv, cv in zip(values, self.codes_values):
1445 self.assertEqual(sv.value, cv[2])
1446
1447 def test_rawvalue(self):
1448 self.test_value(raw=True)
1449
1450 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001451 val1 = self.Value('i', 5)
1452 lock1 = val1.get_lock()
1453 obj1 = val1.get_obj()
1454
1455 val2 = self.Value('i', 5, lock=None)
1456 lock2 = val2.get_lock()
1457 obj2 = val2.get_obj()
1458
1459 lock = self.Lock()
1460 val3 = self.Value('i', 5, lock=lock)
1461 lock3 = val3.get_lock()
1462 obj3 = val3.get_obj()
1463 self.assertEqual(lock, lock3)
1464
Jesse Nollerb0516a62009-01-18 03:11:38 +00001465 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001466 self.assertFalse(hasattr(arr4, 'get_lock'))
1467 self.assertFalse(hasattr(arr4, 'get_obj'))
1468
Jesse Nollerb0516a62009-01-18 03:11:38 +00001469 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1470
1471 arr5 = self.RawValue('i', 5)
1472 self.assertFalse(hasattr(arr5, 'get_lock'))
1473 self.assertFalse(hasattr(arr5, 'get_obj'))
1474
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475
1476class _TestArray(BaseTestCase):
1477
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001478 ALLOWED_TYPES = ('processes',)
1479
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001480 @classmethod
1481 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001482 for i in range(1, len(seq)):
1483 seq[i] += seq[i-1]
1484
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001485 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001486 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001487 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1488 if raw:
1489 arr = self.RawArray('i', seq)
1490 else:
1491 arr = self.Array('i', seq)
1492
1493 self.assertEqual(len(arr), len(seq))
1494 self.assertEqual(arr[3], seq[3])
1495 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1496
1497 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1498
1499 self.assertEqual(list(arr[:]), seq)
1500
1501 self.f(seq)
1502
1503 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001504 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001505 p.start()
1506 p.join()
1507
1508 self.assertEqual(list(arr[:]), seq)
1509
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001510 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001511 def test_array_from_size(self):
1512 size = 10
1513 # Test for zeroing (see issue #11675).
1514 # The repetition below strengthens the test by increasing the chances
1515 # of previously allocated non-zero memory being used for the new array
1516 # on the 2nd and 3rd loops.
1517 for _ in range(3):
1518 arr = self.Array('i', size)
1519 self.assertEqual(len(arr), size)
1520 self.assertEqual(list(arr), [0] * size)
1521 arr[:] = range(10)
1522 self.assertEqual(list(arr), list(range(10)))
1523 del arr
1524
1525 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001526 def test_rawarray(self):
1527 self.test_array(raw=True)
1528
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001529 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001530 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001531 arr1 = self.Array('i', list(range(10)))
1532 lock1 = arr1.get_lock()
1533 obj1 = arr1.get_obj()
1534
1535 arr2 = self.Array('i', list(range(10)), lock=None)
1536 lock2 = arr2.get_lock()
1537 obj2 = arr2.get_obj()
1538
1539 lock = self.Lock()
1540 arr3 = self.Array('i', list(range(10)), lock=lock)
1541 lock3 = arr3.get_lock()
1542 obj3 = arr3.get_obj()
1543 self.assertEqual(lock, lock3)
1544
Jesse Nollerb0516a62009-01-18 03:11:38 +00001545 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001546 self.assertFalse(hasattr(arr4, 'get_lock'))
1547 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001548 self.assertRaises(AttributeError,
1549 self.Array, 'i', range(10), lock='notalock')
1550
1551 arr5 = self.RawArray('i', range(10))
1552 self.assertFalse(hasattr(arr5, 'get_lock'))
1553 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001554
1555#
1556#
1557#
1558
1559class _TestContainers(BaseTestCase):
1560
1561 ALLOWED_TYPES = ('manager',)
1562
1563 def test_list(self):
1564 a = self.list(list(range(10)))
1565 self.assertEqual(a[:], list(range(10)))
1566
1567 b = self.list()
1568 self.assertEqual(b[:], [])
1569
1570 b.extend(list(range(5)))
1571 self.assertEqual(b[:], list(range(5)))
1572
1573 self.assertEqual(b[2], 2)
1574 self.assertEqual(b[2:10], [2,3,4])
1575
1576 b *= 2
1577 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1578
1579 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1580
1581 self.assertEqual(a[:], list(range(10)))
1582
1583 d = [a, b]
1584 e = self.list(d)
1585 self.assertEqual(
1586 e[:],
1587 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1588 )
1589
1590 f = self.list([a])
1591 a.append('hello')
1592 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1593
1594 def test_dict(self):
1595 d = self.dict()
1596 indices = list(range(65, 70))
1597 for i in indices:
1598 d[i] = chr(i)
1599 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1600 self.assertEqual(sorted(d.keys()), indices)
1601 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1602 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1603
1604 def test_namespace(self):
1605 n = self.Namespace()
1606 n.name = 'Bob'
1607 n.job = 'Builder'
1608 n._hidden = 'hidden'
1609 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1610 del n.job
1611 self.assertEqual(str(n), "Namespace(name='Bob')")
1612 self.assertTrue(hasattr(n, 'name'))
1613 self.assertTrue(not hasattr(n, 'job'))
1614
1615#
1616#
1617#
1618
1619def sqr(x, wait=0.0):
1620 time.sleep(wait)
1621 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001622
Antoine Pitroude911b22011-12-21 11:03:24 +01001623def mul(x, y):
1624 return x*y
1625
Benjamin Petersone711caf2008-06-11 16:44:04 +00001626class _TestPool(BaseTestCase):
1627
Richard Oudkerkd15642e2013-07-16 15:33:41 +01001628 @classmethod
1629 def setUpClass(cls):
1630 super().setUpClass()
1631 cls.pool = cls.Pool(4)
1632
1633 @classmethod
1634 def tearDownClass(cls):
1635 cls.pool.terminate()
1636 cls.pool.join()
1637 cls.pool = None
1638 super().tearDownClass()
1639
Benjamin Petersone711caf2008-06-11 16:44:04 +00001640 def test_apply(self):
1641 papply = self.pool.apply
1642 self.assertEqual(papply(sqr, (5,)), sqr(5))
1643 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1644
1645 def test_map(self):
1646 pmap = self.pool.map
1647 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1648 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1649 list(map(sqr, list(range(100)))))
1650
Antoine Pitroude911b22011-12-21 11:03:24 +01001651 def test_starmap(self):
1652 psmap = self.pool.starmap
1653 tuples = list(zip(range(10), range(9,-1, -1)))
1654 self.assertEqual(psmap(mul, tuples),
1655 list(itertools.starmap(mul, tuples)))
1656 tuples = list(zip(range(100), range(99,-1, -1)))
1657 self.assertEqual(psmap(mul, tuples, chunksize=20),
1658 list(itertools.starmap(mul, tuples)))
1659
1660 def test_starmap_async(self):
1661 tuples = list(zip(range(100), range(99,-1, -1)))
1662 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1663 list(itertools.starmap(mul, tuples)))
1664
Hynek Schlawack254af262012-10-27 12:53:02 +02001665 def test_map_async(self):
1666 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1667 list(map(sqr, list(range(10)))))
1668
1669 def test_map_async_callbacks(self):
1670 call_args = self.manager.list() if self.TYPE == 'manager' else []
1671 self.pool.map_async(int, ['1'],
1672 callback=call_args.append,
1673 error_callback=call_args.append).wait()
1674 self.assertEqual(1, len(call_args))
1675 self.assertEqual([1], call_args[0])
1676 self.pool.map_async(int, ['a'],
1677 callback=call_args.append,
1678 error_callback=call_args.append).wait()
1679 self.assertEqual(2, len(call_args))
1680 self.assertIsInstance(call_args[1], ValueError)
1681
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001682 def test_map_chunksize(self):
1683 try:
1684 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1685 except multiprocessing.TimeoutError:
1686 self.fail("pool.map_async with chunksize stalled on null list")
1687
Benjamin Petersone711caf2008-06-11 16:44:04 +00001688 def test_async(self):
1689 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1690 get = TimingWrapper(res.get)
1691 self.assertEqual(get(), 49)
1692 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1693
1694 def test_async_timeout(self):
1695 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1696 get = TimingWrapper(res.get)
1697 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1698 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1699
1700 def test_imap(self):
1701 it = self.pool.imap(sqr, list(range(10)))
1702 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1703
1704 it = self.pool.imap(sqr, list(range(10)))
1705 for i in range(10):
1706 self.assertEqual(next(it), i*i)
1707 self.assertRaises(StopIteration, it.__next__)
1708
1709 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1710 for i in range(1000):
1711 self.assertEqual(next(it), i*i)
1712 self.assertRaises(StopIteration, it.__next__)
1713
1714 def test_imap_unordered(self):
1715 it = self.pool.imap_unordered(sqr, list(range(1000)))
1716 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1717
1718 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1719 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1720
1721 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001722 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1723 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1724
Benjamin Petersone711caf2008-06-11 16:44:04 +00001725 p = multiprocessing.Pool(3)
1726 self.assertEqual(3, len(p._pool))
1727 p.close()
1728 p.join()
1729
1730 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001731 result = self.pool.map_async(
1732 time.sleep, [0.1 for i in range(10000)], chunksize=1
1733 )
1734 self.pool.terminate()
1735 join = TimingWrapper(self.pool.join)
1736 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001737 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001738
Richard Oudkerke41682b2012-06-06 19:04:57 +01001739 def test_empty_iterable(self):
1740 # See Issue 12157
1741 p = self.Pool(1)
1742
1743 self.assertEqual(p.map(sqr, []), [])
1744 self.assertEqual(list(p.imap(sqr, [])), [])
1745 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1746 self.assertEqual(p.map_async(sqr, []).get(), [])
1747
1748 p.close()
1749 p.join()
1750
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001751 def test_context(self):
1752 if self.TYPE == 'processes':
1753 L = list(range(10))
1754 expected = [sqr(i) for i in L]
1755 with multiprocessing.Pool(2) as p:
1756 r = p.map_async(sqr, L)
1757 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001758 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001759
Ask Solem2afcbf22010-11-09 20:55:52 +00001760def raising():
1761 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001762
Ask Solem2afcbf22010-11-09 20:55:52 +00001763def unpickleable_result():
1764 return lambda: 42
1765
1766class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001767 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001768
1769 def test_async_error_callback(self):
1770 p = multiprocessing.Pool(2)
1771
1772 scratchpad = [None]
1773 def errback(exc):
1774 scratchpad[0] = exc
1775
1776 res = p.apply_async(raising, error_callback=errback)
1777 self.assertRaises(KeyError, res.get)
1778 self.assertTrue(scratchpad[0])
1779 self.assertIsInstance(scratchpad[0], KeyError)
1780
1781 p.close()
1782 p.join()
1783
1784 def test_unpickleable_result(self):
1785 from multiprocessing.pool import MaybeEncodingError
1786 p = multiprocessing.Pool(2)
1787
1788 # Make sure we don't lose pool processes because of encoding errors.
1789 for iteration in range(20):
1790
1791 scratchpad = [None]
1792 def errback(exc):
1793 scratchpad[0] = exc
1794
1795 res = p.apply_async(unpickleable_result, error_callback=errback)
1796 self.assertRaises(MaybeEncodingError, res.get)
1797 wrapped = scratchpad[0]
1798 self.assertTrue(wrapped)
1799 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1800 self.assertIsNotNone(wrapped.exc)
1801 self.assertIsNotNone(wrapped.value)
1802
1803 p.close()
1804 p.join()
1805
1806class _TestPoolWorkerLifetime(BaseTestCase):
1807 ALLOWED_TYPES = ('processes', )
1808
Jesse Noller1f0b6582010-01-27 03:36:01 +00001809 def test_pool_worker_lifetime(self):
1810 p = multiprocessing.Pool(3, maxtasksperchild=10)
1811 self.assertEqual(3, len(p._pool))
1812 origworkerpids = [w.pid for w in p._pool]
1813 # Run many tasks so each worker gets replaced (hopefully)
1814 results = []
1815 for i in range(100):
1816 results.append(p.apply_async(sqr, (i, )))
1817 # Fetch the results and verify we got the right answers,
1818 # also ensuring all the tasks have completed.
1819 for (j, res) in enumerate(results):
1820 self.assertEqual(res.get(), sqr(j))
1821 # Refill the pool
1822 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001823 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001824 # (countdown * DELTA = 5 seconds max startup process time)
1825 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001826 while countdown and not all(w.is_alive() for w in p._pool):
1827 countdown -= 1
1828 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001829 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001830 # All pids should be assigned. See issue #7805.
1831 self.assertNotIn(None, origworkerpids)
1832 self.assertNotIn(None, finalworkerpids)
1833 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001834 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1835 p.close()
1836 p.join()
1837
Charles-François Natalif8859e12011-10-24 18:45:29 +02001838 def test_pool_worker_lifetime_early_close(self):
1839 # Issue #10332: closing a pool whose workers have limited lifetimes
1840 # before all the tasks completed would make join() hang.
1841 p = multiprocessing.Pool(3, maxtasksperchild=1)
1842 results = []
1843 for i in range(6):
1844 results.append(p.apply_async(sqr, (i, 0.3)))
1845 p.close()
1846 p.join()
1847 # check the results
1848 for (j, res) in enumerate(results):
1849 self.assertEqual(res.get(), sqr(j))
1850
Benjamin Petersone711caf2008-06-11 16:44:04 +00001851#
1852# Test of creating a customized manager class
1853#
1854
1855from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1856
1857class FooBar(object):
1858 def f(self):
1859 return 'f()'
1860 def g(self):
1861 raise ValueError
1862 def _h(self):
1863 return '_h()'
1864
1865def baz():
1866 for i in range(10):
1867 yield i*i
1868
1869class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001870 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001871 def __iter__(self):
1872 return self
1873 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001874 return self._callmethod('__next__')
1875
1876class MyManager(BaseManager):
1877 pass
1878
1879MyManager.register('Foo', callable=FooBar)
1880MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1881MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1882
1883
1884class _TestMyManager(BaseTestCase):
1885
1886 ALLOWED_TYPES = ('manager',)
1887
1888 def test_mymanager(self):
1889 manager = MyManager()
1890 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001891 self.common(manager)
1892 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001893
Richard Oudkerkac385712012-06-18 21:29:30 +01001894 # If the manager process exited cleanly then the exitcode
1895 # will be zero. Otherwise (after a short timeout)
1896 # terminate() is used, resulting in an exitcode of -SIGTERM.
1897 self.assertEqual(manager._process.exitcode, 0)
1898
1899 def test_mymanager_context(self):
1900 with MyManager() as manager:
1901 self.common(manager)
1902 self.assertEqual(manager._process.exitcode, 0)
1903
1904 def test_mymanager_context_prestarted(self):
1905 manager = MyManager()
1906 manager.start()
1907 with manager:
1908 self.common(manager)
1909 self.assertEqual(manager._process.exitcode, 0)
1910
1911 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001912 foo = manager.Foo()
1913 bar = manager.Bar()
1914 baz = manager.baz()
1915
1916 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1917 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1918
1919 self.assertEqual(foo_methods, ['f', 'g'])
1920 self.assertEqual(bar_methods, ['f', '_h'])
1921
1922 self.assertEqual(foo.f(), 'f()')
1923 self.assertRaises(ValueError, foo.g)
1924 self.assertEqual(foo._callmethod('f'), 'f()')
1925 self.assertRaises(RemoteError, foo._callmethod, '_h')
1926
1927 self.assertEqual(bar.f(), 'f()')
1928 self.assertEqual(bar._h(), '_h()')
1929 self.assertEqual(bar._callmethod('f'), 'f()')
1930 self.assertEqual(bar._callmethod('_h'), '_h()')
1931
1932 self.assertEqual(list(baz), [i*i for i in range(10)])
1933
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001934
Benjamin Petersone711caf2008-06-11 16:44:04 +00001935#
1936# Test of connecting to a remote server and using xmlrpclib for serialization
1937#
1938
1939_queue = pyqueue.Queue()
1940def get_queue():
1941 return _queue
1942
1943class QueueManager(BaseManager):
1944 '''manager class used by server process'''
1945QueueManager.register('get_queue', callable=get_queue)
1946
1947class QueueManager2(BaseManager):
1948 '''manager class which specifies the same interface as QueueManager'''
1949QueueManager2.register('get_queue')
1950
1951
1952SERIALIZER = 'xmlrpclib'
1953
1954class _TestRemoteManager(BaseTestCase):
1955
1956 ALLOWED_TYPES = ('manager',)
1957
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001958 @classmethod
1959 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001960 manager = QueueManager2(
1961 address=address, authkey=authkey, serializer=SERIALIZER
1962 )
1963 manager.connect()
1964 queue = manager.get_queue()
1965 queue.put(('hello world', None, True, 2.25))
1966
1967 def test_remote(self):
1968 authkey = os.urandom(32)
1969
1970 manager = QueueManager(
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02001971 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00001972 )
1973 manager.start()
1974
1975 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001976 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001977 p.start()
1978
1979 manager2 = QueueManager2(
1980 address=manager.address, authkey=authkey, serializer=SERIALIZER
1981 )
1982 manager2.connect()
1983 queue = manager2.get_queue()
1984
1985 # Note that xmlrpclib will deserialize object as a list not a tuple
1986 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1987
1988 # Because we are using xmlrpclib for serialization instead of
1989 # pickle this will cause a serialization error.
1990 self.assertRaises(Exception, queue.put, time.sleep)
1991
1992 # Make queue finalizer run before the server is stopped
1993 del queue
1994 manager.shutdown()
1995
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001996class _TestManagerRestart(BaseTestCase):
1997
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001998 @classmethod
1999 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002000 manager = QueueManager(
2001 address=address, authkey=authkey, serializer=SERIALIZER)
2002 manager.connect()
2003 queue = manager.get_queue()
2004 queue.put('hello world')
2005
2006 def test_rapid_restart(self):
2007 authkey = os.urandom(32)
2008 manager = QueueManager(
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02002009 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002010 srvr = manager.get_server()
2011 addr = srvr.address
2012 # Close the connection.Listener socket which gets opened as a part
2013 # of manager.get_server(). It's not needed for the test.
2014 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002015 manager.start()
2016
2017 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002018 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002019 p.start()
2020 queue = manager.get_queue()
2021 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002022 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002023 manager.shutdown()
2024 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002025 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002026 try:
2027 manager.start()
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002028 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002029 if e.errno != errno.EADDRINUSE:
2030 raise
2031 # Retry after some time, in case the old socket was lingering
2032 # (sporadic failure on buildbots)
2033 time.sleep(1.0)
2034 manager = QueueManager(
2035 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002036 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002037
Benjamin Petersone711caf2008-06-11 16:44:04 +00002038#
2039#
2040#
2041
2042SENTINEL = latin('')
2043
2044class _TestConnection(BaseTestCase):
2045
2046 ALLOWED_TYPES = ('processes', 'threads')
2047
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002048 @classmethod
2049 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002050 for msg in iter(conn.recv_bytes, SENTINEL):
2051 conn.send_bytes(msg)
2052 conn.close()
2053
2054 def test_connection(self):
2055 conn, child_conn = self.Pipe()
2056
2057 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002058 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002059 p.start()
2060
2061 seq = [1, 2.25, None]
2062 msg = latin('hello world')
2063 longmsg = msg * 10
2064 arr = array.array('i', list(range(4)))
2065
2066 if self.TYPE == 'processes':
2067 self.assertEqual(type(conn.fileno()), int)
2068
2069 self.assertEqual(conn.send(seq), None)
2070 self.assertEqual(conn.recv(), seq)
2071
2072 self.assertEqual(conn.send_bytes(msg), None)
2073 self.assertEqual(conn.recv_bytes(), msg)
2074
2075 if self.TYPE == 'processes':
2076 buffer = array.array('i', [0]*10)
2077 expected = list(arr) + [0] * (10 - len(arr))
2078 self.assertEqual(conn.send_bytes(arr), None)
2079 self.assertEqual(conn.recv_bytes_into(buffer),
2080 len(arr) * buffer.itemsize)
2081 self.assertEqual(list(buffer), expected)
2082
2083 buffer = array.array('i', [0]*10)
2084 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2085 self.assertEqual(conn.send_bytes(arr), None)
2086 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2087 len(arr) * buffer.itemsize)
2088 self.assertEqual(list(buffer), expected)
2089
2090 buffer = bytearray(latin(' ' * 40))
2091 self.assertEqual(conn.send_bytes(longmsg), None)
2092 try:
2093 res = conn.recv_bytes_into(buffer)
2094 except multiprocessing.BufferTooShort as e:
2095 self.assertEqual(e.args, (longmsg,))
2096 else:
2097 self.fail('expected BufferTooShort, got %s' % res)
2098
2099 poll = TimingWrapper(conn.poll)
2100
2101 self.assertEqual(poll(), False)
2102 self.assertTimingAlmostEqual(poll.elapsed, 0)
2103
Richard Oudkerk59d54042012-05-10 16:11:12 +01002104 self.assertEqual(poll(-1), False)
2105 self.assertTimingAlmostEqual(poll.elapsed, 0)
2106
Benjamin Petersone711caf2008-06-11 16:44:04 +00002107 self.assertEqual(poll(TIMEOUT1), False)
2108 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2109
2110 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002111 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002112
2113 self.assertEqual(poll(TIMEOUT1), True)
2114 self.assertTimingAlmostEqual(poll.elapsed, 0)
2115
2116 self.assertEqual(conn.recv(), None)
2117
2118 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2119 conn.send_bytes(really_big_msg)
2120 self.assertEqual(conn.recv_bytes(), really_big_msg)
2121
2122 conn.send_bytes(SENTINEL) # tell child to quit
2123 child_conn.close()
2124
2125 if self.TYPE == 'processes':
2126 self.assertEqual(conn.readable, True)
2127 self.assertEqual(conn.writable, True)
2128 self.assertRaises(EOFError, conn.recv)
2129 self.assertRaises(EOFError, conn.recv_bytes)
2130
2131 p.join()
2132
2133 def test_duplex_false(self):
2134 reader, writer = self.Pipe(duplex=False)
2135 self.assertEqual(writer.send(1), None)
2136 self.assertEqual(reader.recv(), 1)
2137 if self.TYPE == 'processes':
2138 self.assertEqual(reader.readable, True)
2139 self.assertEqual(reader.writable, False)
2140 self.assertEqual(writer.readable, False)
2141 self.assertEqual(writer.writable, True)
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002142 self.assertRaises(OSError, reader.send, 2)
2143 self.assertRaises(OSError, writer.recv)
2144 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002145
2146 def test_spawn_close(self):
2147 # We test that a pipe connection can be closed by parent
2148 # process immediately after child is spawned. On Windows this
2149 # would have sometimes failed on old versions because
2150 # child_conn would be closed before the child got a chance to
2151 # duplicate it.
2152 conn, child_conn = self.Pipe()
2153
2154 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002155 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002156 p.start()
2157 child_conn.close() # this might complete before child initializes
2158
2159 msg = latin('hello')
2160 conn.send_bytes(msg)
2161 self.assertEqual(conn.recv_bytes(), msg)
2162
2163 conn.send_bytes(SENTINEL)
2164 conn.close()
2165 p.join()
2166
2167 def test_sendbytes(self):
2168 if self.TYPE != 'processes':
2169 return
2170
2171 msg = latin('abcdefghijklmnopqrstuvwxyz')
2172 a, b = self.Pipe()
2173
2174 a.send_bytes(msg)
2175 self.assertEqual(b.recv_bytes(), msg)
2176
2177 a.send_bytes(msg, 5)
2178 self.assertEqual(b.recv_bytes(), msg[5:])
2179
2180 a.send_bytes(msg, 7, 8)
2181 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2182
2183 a.send_bytes(msg, 26)
2184 self.assertEqual(b.recv_bytes(), latin(''))
2185
2186 a.send_bytes(msg, 26, 0)
2187 self.assertEqual(b.recv_bytes(), latin(''))
2188
2189 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2190
2191 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2192
2193 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2194
2195 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2196
2197 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2198
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002199 @classmethod
2200 def _is_fd_assigned(cls, fd):
2201 try:
2202 os.fstat(fd)
2203 except OSError as e:
2204 if e.errno == errno.EBADF:
2205 return False
2206 raise
2207 else:
2208 return True
2209
2210 @classmethod
2211 def _writefd(cls, conn, data, create_dummy_fds=False):
2212 if create_dummy_fds:
2213 for i in range(0, 256):
2214 if not cls._is_fd_assigned(i):
2215 os.dup2(conn.fileno(), i)
2216 fd = reduction.recv_handle(conn)
2217 if msvcrt:
2218 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2219 os.write(fd, data)
2220 os.close(fd)
2221
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002222 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002223 def test_fd_transfer(self):
2224 if self.TYPE != 'processes':
2225 self.skipTest("only makes sense with processes")
2226 conn, child_conn = self.Pipe(duplex=True)
2227
2228 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002229 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002230 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002231 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002232 with open(test.support.TESTFN, "wb") as f:
2233 fd = f.fileno()
2234 if msvcrt:
2235 fd = msvcrt.get_osfhandle(fd)
2236 reduction.send_handle(conn, fd, p.pid)
2237 p.join()
2238 with open(test.support.TESTFN, "rb") as f:
2239 self.assertEqual(f.read(), b"foo")
2240
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002241 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002242 @unittest.skipIf(sys.platform == "win32",
2243 "test semantics don't make sense on Windows")
2244 @unittest.skipIf(MAXFD <= 256,
2245 "largest assignable fd number is too small")
2246 @unittest.skipUnless(hasattr(os, "dup2"),
2247 "test needs os.dup2()")
2248 def test_large_fd_transfer(self):
2249 # With fd > 256 (issue #11657)
2250 if self.TYPE != 'processes':
2251 self.skipTest("only makes sense with processes")
2252 conn, child_conn = self.Pipe(duplex=True)
2253
2254 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002255 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002256 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002257 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002258 with open(test.support.TESTFN, "wb") as f:
2259 fd = f.fileno()
2260 for newfd in range(256, MAXFD):
2261 if not self._is_fd_assigned(newfd):
2262 break
2263 else:
2264 self.fail("could not find an unassigned large file descriptor")
2265 os.dup2(fd, newfd)
2266 try:
2267 reduction.send_handle(conn, newfd, p.pid)
2268 finally:
2269 os.close(newfd)
2270 p.join()
2271 with open(test.support.TESTFN, "rb") as f:
2272 self.assertEqual(f.read(), b"bar")
2273
Jesus Cea4507e642011-09-21 03:53:25 +02002274 @classmethod
2275 def _send_data_without_fd(self, conn):
2276 os.write(conn.fileno(), b"\0")
2277
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002278 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002279 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2280 def test_missing_fd_transfer(self):
2281 # Check that exception is raised when received data is not
2282 # accompanied by a file descriptor in ancillary data.
2283 if self.TYPE != 'processes':
2284 self.skipTest("only makes sense with processes")
2285 conn, child_conn = self.Pipe(duplex=True)
2286
2287 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2288 p.daemon = True
2289 p.start()
2290 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2291 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002292
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002293 def test_context(self):
2294 a, b = self.Pipe()
2295
2296 with a, b:
2297 a.send(1729)
2298 self.assertEqual(b.recv(), 1729)
2299 if self.TYPE == 'processes':
2300 self.assertFalse(a.closed)
2301 self.assertFalse(b.closed)
2302
2303 if self.TYPE == 'processes':
2304 self.assertTrue(a.closed)
2305 self.assertTrue(b.closed)
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002306 self.assertRaises(OSError, a.recv)
2307 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002308
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002309class _TestListener(BaseTestCase):
2310
Richard Oudkerk91257752012-06-15 21:53:34 +01002311 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002312
2313 def test_multiple_bind(self):
2314 for family in self.connection.families:
2315 l = self.connection.Listener(family=family)
2316 self.addCleanup(l.close)
2317 self.assertRaises(OSError, self.connection.Listener,
2318 l.address, family)
2319
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002320 def test_context(self):
2321 with self.connection.Listener() as l:
2322 with self.connection.Client(l.address) as c:
2323 with l.accept() as d:
2324 c.send(1729)
2325 self.assertEqual(d.recv(), 1729)
2326
2327 if self.TYPE == 'processes':
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002328 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002329
Benjamin Petersone711caf2008-06-11 16:44:04 +00002330class _TestListenerClient(BaseTestCase):
2331
2332 ALLOWED_TYPES = ('processes', 'threads')
2333
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002334 @classmethod
2335 def _test(cls, address):
2336 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002337 conn.send('hello')
2338 conn.close()
2339
2340 def test_listener_client(self):
2341 for family in self.connection.families:
2342 l = self.connection.Listener(family=family)
2343 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002344 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002345 p.start()
2346 conn = l.accept()
2347 self.assertEqual(conn.recv(), 'hello')
2348 p.join()
2349 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002350
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002351 def test_issue14725(self):
2352 l = self.connection.Listener()
2353 p = self.Process(target=self._test, args=(l.address,))
2354 p.daemon = True
2355 p.start()
2356 time.sleep(1)
2357 # On Windows the client process should by now have connected,
2358 # written data and closed the pipe handle by now. This causes
2359 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2360 # 14725.
2361 conn = l.accept()
2362 self.assertEqual(conn.recv(), 'hello')
2363 conn.close()
2364 p.join()
2365 l.close()
2366
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002367 def test_issue16955(self):
2368 for fam in self.connection.families:
2369 l = self.connection.Listener(family=fam)
2370 c = self.connection.Client(l.address)
2371 a = l.accept()
2372 a.send_bytes(b"hello")
2373 self.assertTrue(c.poll(1))
2374 a.close()
2375 c.close()
2376 l.close()
2377
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002378class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002379
2380 ALLOWED_TYPES = ('processes', 'threads')
2381
2382 def test_empty_string(self):
2383 a, b = self.Pipe()
2384 self.assertEqual(a.poll(), False)
2385 b.send_bytes(b'')
2386 self.assertEqual(a.poll(), True)
2387 self.assertEqual(a.poll(), True)
2388
2389 @classmethod
2390 def _child_strings(cls, conn, strings):
2391 for s in strings:
2392 time.sleep(0.1)
2393 conn.send_bytes(s)
2394 conn.close()
2395
2396 def test_strings(self):
2397 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2398 a, b = self.Pipe()
2399 p = self.Process(target=self._child_strings, args=(b, strings))
2400 p.start()
2401
2402 for s in strings:
2403 for i in range(200):
2404 if a.poll(0.01):
2405 break
2406 x = a.recv_bytes()
2407 self.assertEqual(s, x)
2408
2409 p.join()
2410
2411 @classmethod
2412 def _child_boundaries(cls, r):
2413 # Polling may "pull" a message in to the child process, but we
2414 # don't want it to pull only part of a message, as that would
2415 # corrupt the pipe for any other processes which might later
2416 # read from it.
2417 r.poll(5)
2418
2419 def test_boundaries(self):
2420 r, w = self.Pipe(False)
2421 p = self.Process(target=self._child_boundaries, args=(r,))
2422 p.start()
2423 time.sleep(2)
2424 L = [b"first", b"second"]
2425 for obj in L:
2426 w.send_bytes(obj)
2427 w.close()
2428 p.join()
2429 self.assertIn(r.recv_bytes(), L)
2430
2431 @classmethod
2432 def _child_dont_merge(cls, b):
2433 b.send_bytes(b'a')
2434 b.send_bytes(b'b')
2435 b.send_bytes(b'cd')
2436
2437 def test_dont_merge(self):
2438 a, b = self.Pipe()
2439 self.assertEqual(a.poll(0.0), False)
2440 self.assertEqual(a.poll(0.1), False)
2441
2442 p = self.Process(target=self._child_dont_merge, args=(b,))
2443 p.start()
2444
2445 self.assertEqual(a.recv_bytes(), b'a')
2446 self.assertEqual(a.poll(1.0), True)
2447 self.assertEqual(a.poll(1.0), True)
2448 self.assertEqual(a.recv_bytes(), b'b')
2449 self.assertEqual(a.poll(1.0), True)
2450 self.assertEqual(a.poll(1.0), True)
2451 self.assertEqual(a.poll(0.0), True)
2452 self.assertEqual(a.recv_bytes(), b'cd')
2453
2454 p.join()
2455
Benjamin Petersone711caf2008-06-11 16:44:04 +00002456#
2457# Test of sending connection and socket objects between processes
2458#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002459
2460@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002461class _TestPicklingConnections(BaseTestCase):
2462
2463 ALLOWED_TYPES = ('processes',)
2464
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002465 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002466 def tearDownClass(cls):
2467 from multiprocessing.reduction import resource_sharer
2468 resource_sharer.stop(timeout=5)
2469
2470 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002471 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002472 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002473 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002474 conn.send(l.address)
2475 new_conn = l.accept()
2476 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002477 new_conn.close()
2478 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002479
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002480 l = socket.socket()
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02002481 l.bind((test.support.HOST, 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002482 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002483 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002484 new_conn, addr = l.accept()
2485 conn.send(new_conn)
2486 new_conn.close()
2487 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002488
2489 conn.recv()
2490
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002491 @classmethod
2492 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002493 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002494 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002495 client.send(msg.upper())
2496 client.close()
2497
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002498 address, msg = conn.recv()
2499 client = socket.socket()
2500 client.connect(address)
2501 client.sendall(msg.upper())
2502 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002503
2504 conn.close()
2505
2506 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002507 families = self.connection.families
2508
2509 lconn, lconn0 = self.Pipe()
2510 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002511 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002512 lp.start()
2513 lconn0.close()
2514
2515 rconn, rconn0 = self.Pipe()
2516 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002517 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002518 rp.start()
2519 rconn0.close()
2520
2521 for fam in families:
2522 msg = ('This connection uses family %s' % fam).encode('ascii')
2523 address = lconn.recv()
2524 rconn.send((address, msg))
2525 new_conn = lconn.recv()
2526 self.assertEqual(new_conn.recv(), msg.upper())
2527
2528 rconn.send(None)
2529
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002530 msg = latin('This connection uses a normal socket')
2531 address = lconn.recv()
2532 rconn.send((address, msg))
2533 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002534 buf = []
2535 while True:
2536 s = new_conn.recv(100)
2537 if not s:
2538 break
2539 buf.append(s)
2540 buf = b''.join(buf)
2541 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002542 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002543
2544 lconn.send(None)
2545
2546 rconn.close()
2547 lconn.close()
2548
2549 lp.join()
2550 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002551
2552 @classmethod
2553 def child_access(cls, conn):
2554 w = conn.recv()
2555 w.send('all is well')
2556 w.close()
2557
2558 r = conn.recv()
2559 msg = r.recv()
2560 conn.send(msg*2)
2561
2562 conn.close()
2563
2564 def test_access(self):
2565 # On Windows, if we do not specify a destination pid when
2566 # using DupHandle then we need to be careful to use the
2567 # correct access flags for DuplicateHandle(), or else
2568 # DupHandle.detach() will raise PermissionError. For example,
2569 # for a read only pipe handle we should use
2570 # access=FILE_GENERIC_READ. (Unfortunately
2571 # DUPLICATE_SAME_ACCESS does not work.)
2572 conn, child_conn = self.Pipe()
2573 p = self.Process(target=self.child_access, args=(child_conn,))
2574 p.daemon = True
2575 p.start()
2576 child_conn.close()
2577
2578 r, w = self.Pipe(duplex=False)
2579 conn.send(w)
2580 w.close()
2581 self.assertEqual(r.recv(), 'all is well')
2582 r.close()
2583
2584 r, w = self.Pipe(duplex=False)
2585 conn.send(r)
2586 r.close()
2587 w.send('foobar')
2588 w.close()
2589 self.assertEqual(conn.recv(), 'foobar'*2)
2590
Benjamin Petersone711caf2008-06-11 16:44:04 +00002591#
2592#
2593#
2594
2595class _TestHeap(BaseTestCase):
2596
2597 ALLOWED_TYPES = ('processes',)
2598
2599 def test_heap(self):
2600 iterations = 5000
2601 maxblocks = 50
2602 blocks = []
2603
2604 # create and destroy lots of blocks of different sizes
2605 for i in range(iterations):
2606 size = int(random.lognormvariate(0, 1) * 1000)
2607 b = multiprocessing.heap.BufferWrapper(size)
2608 blocks.append(b)
2609 if len(blocks) > maxblocks:
2610 i = random.randrange(maxblocks)
2611 del blocks[i]
2612
2613 # get the heap object
2614 heap = multiprocessing.heap.BufferWrapper._heap
2615
2616 # verify the state of the heap
2617 all = []
2618 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002619 heap._lock.acquire()
2620 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002621 for L in list(heap._len_to_seq.values()):
2622 for arena, start, stop in L:
2623 all.append((heap._arenas.index(arena), start, stop,
2624 stop-start, 'free'))
2625 for arena, start, stop in heap._allocated_blocks:
2626 all.append((heap._arenas.index(arena), start, stop,
2627 stop-start, 'occupied'))
2628 occupied += (stop-start)
2629
2630 all.sort()
2631
2632 for i in range(len(all)-1):
2633 (arena, start, stop) = all[i][:3]
2634 (narena, nstart, nstop) = all[i+1][:3]
2635 self.assertTrue((arena != narena and nstart == 0) or
2636 (stop == nstart))
2637
Charles-François Natali778db492011-07-02 14:35:49 +02002638 def test_free_from_gc(self):
2639 # Check that freeing of blocks by the garbage collector doesn't deadlock
2640 # (issue #12352).
2641 # Make sure the GC is enabled, and set lower collection thresholds to
2642 # make collections more frequent (and increase the probability of
2643 # deadlock).
2644 if not gc.isenabled():
2645 gc.enable()
2646 self.addCleanup(gc.disable)
2647 thresholds = gc.get_threshold()
2648 self.addCleanup(gc.set_threshold, *thresholds)
2649 gc.set_threshold(10)
2650
2651 # perform numerous block allocations, with cyclic references to make
2652 # sure objects are collected asynchronously by the gc
2653 for i in range(5000):
2654 a = multiprocessing.heap.BufferWrapper(1)
2655 b = multiprocessing.heap.BufferWrapper(1)
2656 # circular references
2657 a.buddy = b
2658 b.buddy = a
2659
Benjamin Petersone711caf2008-06-11 16:44:04 +00002660#
2661#
2662#
2663
Benjamin Petersone711caf2008-06-11 16:44:04 +00002664class _Foo(Structure):
2665 _fields_ = [
2666 ('x', c_int),
2667 ('y', c_double)
2668 ]
2669
2670class _TestSharedCTypes(BaseTestCase):
2671
2672 ALLOWED_TYPES = ('processes',)
2673
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002674 def setUp(self):
2675 if not HAS_SHAREDCTYPES:
2676 self.skipTest("requires multiprocessing.sharedctypes")
2677
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002678 @classmethod
2679 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002680 x.value *= 2
2681 y.value *= 2
2682 foo.x *= 2
2683 foo.y *= 2
2684 string.value *= 2
2685 for i in range(len(arr)):
2686 arr[i] *= 2
2687
2688 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002689 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002690 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002691 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002692 arr = self.Array('d', list(range(10)), lock=lock)
2693 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002694 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002695
2696 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002697 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002698 p.start()
2699 p.join()
2700
2701 self.assertEqual(x.value, 14)
2702 self.assertAlmostEqual(y.value, 2.0/3.0)
2703 self.assertEqual(foo.x, 6)
2704 self.assertAlmostEqual(foo.y, 4.0)
2705 for i in range(10):
2706 self.assertAlmostEqual(arr[i], i*2)
2707 self.assertEqual(string.value, latin('hellohello'))
2708
2709 def test_synchronize(self):
2710 self.test_sharedctypes(lock=True)
2711
2712 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002713 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002714 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002715 foo.x = 0
2716 foo.y = 0
2717 self.assertEqual(bar.x, 2)
2718 self.assertAlmostEqual(bar.y, 5.0)
2719
2720#
2721#
2722#
2723
2724class _TestFinalize(BaseTestCase):
2725
2726 ALLOWED_TYPES = ('processes',)
2727
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002728 @classmethod
2729 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002730 class Foo(object):
2731 pass
2732
2733 a = Foo()
2734 util.Finalize(a, conn.send, args=('a',))
2735 del a # triggers callback for a
2736
2737 b = Foo()
2738 close_b = util.Finalize(b, conn.send, args=('b',))
2739 close_b() # triggers callback for b
2740 close_b() # does nothing because callback has already been called
2741 del b # does nothing because callback has already been called
2742
2743 c = Foo()
2744 util.Finalize(c, conn.send, args=('c',))
2745
2746 d10 = Foo()
2747 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2748
2749 d01 = Foo()
2750 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2751 d02 = Foo()
2752 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2753 d03 = Foo()
2754 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2755
2756 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2757
2758 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2759
Ezio Melotti13925002011-03-16 11:05:33 +02002760 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002761 # garbage collecting locals
2762 util._exit_function()
2763 conn.close()
2764 os._exit(0)
2765
2766 def test_finalize(self):
2767 conn, child_conn = self.Pipe()
2768
2769 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002770 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002771 p.start()
2772 p.join()
2773
2774 result = [obj for obj in iter(conn.recv, 'STOP')]
2775 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2776
2777#
2778# Test that from ... import * works for each module
2779#
2780
2781class _TestImportStar(BaseTestCase):
2782
2783 ALLOWED_TYPES = ('processes',)
2784
2785 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002786 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002787 'multiprocessing', 'multiprocessing.connection',
2788 'multiprocessing.heap', 'multiprocessing.managers',
2789 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002790 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002791 ]
2792
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002793 if HAS_REDUCTION:
2794 modules.append('multiprocessing.reduction')
2795
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002796 if c_int is not None:
2797 # This module requires _ctypes
2798 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002799
2800 for name in modules:
2801 __import__(name)
2802 mod = sys.modules[name]
2803
2804 for attr in getattr(mod, '__all__', ()):
2805 self.assertTrue(
2806 hasattr(mod, attr),
2807 '%r does not have attribute %r' % (mod, attr)
2808 )
2809
2810#
2811# Quick test that logging works -- does not test logging output
2812#
2813
2814class _TestLogging(BaseTestCase):
2815
2816 ALLOWED_TYPES = ('processes',)
2817
2818 def test_enable_logging(self):
2819 logger = multiprocessing.get_logger()
2820 logger.setLevel(util.SUBWARNING)
2821 self.assertTrue(logger is not None)
2822 logger.debug('this will not be printed')
2823 logger.info('nor will this')
2824 logger.setLevel(LOG_LEVEL)
2825
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002826 @classmethod
2827 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002828 logger = multiprocessing.get_logger()
2829 conn.send(logger.getEffectiveLevel())
2830
2831 def test_level(self):
2832 LEVEL1 = 32
2833 LEVEL2 = 37
2834
2835 logger = multiprocessing.get_logger()
2836 root_logger = logging.getLogger()
2837 root_level = root_logger.level
2838
2839 reader, writer = multiprocessing.Pipe(duplex=False)
2840
2841 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002842 p = self.Process(target=self._test_level, args=(writer,))
2843 p.daemon = True
2844 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002845 self.assertEqual(LEVEL1, reader.recv())
2846
2847 logger.setLevel(logging.NOTSET)
2848 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002849 p = self.Process(target=self._test_level, args=(writer,))
2850 p.daemon = True
2851 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002852 self.assertEqual(LEVEL2, reader.recv())
2853
2854 root_logger.setLevel(root_level)
2855 logger.setLevel(level=LOG_LEVEL)
2856
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002857
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002858# class _TestLoggingProcessName(BaseTestCase):
2859#
2860# def handle(self, record):
2861# assert record.processName == multiprocessing.current_process().name
2862# self.__handled = True
2863#
2864# def test_logging(self):
2865# handler = logging.Handler()
2866# handler.handle = self.handle
2867# self.__handled = False
2868# # Bypass getLogger() and side-effects
2869# logger = logging.getLoggerClass()(
2870# 'multiprocessing.test.TestLoggingProcessName')
2871# logger.addHandler(handler)
2872# logger.propagate = False
2873#
2874# logger.warn('foo')
2875# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002876
Benjamin Petersone711caf2008-06-11 16:44:04 +00002877#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002878# Check that Process.join() retries if os.waitpid() fails with EINTR
2879#
2880
2881class _TestPollEintr(BaseTestCase):
2882
2883 ALLOWED_TYPES = ('processes',)
2884
2885 @classmethod
2886 def _killer(cls, pid):
2887 time.sleep(0.5)
2888 os.kill(pid, signal.SIGUSR1)
2889
2890 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2891 def test_poll_eintr(self):
2892 got_signal = [False]
2893 def record(*args):
2894 got_signal[0] = True
2895 pid = os.getpid()
2896 oldhandler = signal.signal(signal.SIGUSR1, record)
2897 try:
2898 killer = self.Process(target=self._killer, args=(pid,))
2899 killer.start()
2900 p = self.Process(target=time.sleep, args=(1,))
2901 p.start()
2902 p.join()
2903 self.assertTrue(got_signal[0])
2904 self.assertEqual(p.exitcode, 0)
2905 killer.join()
2906 finally:
2907 signal.signal(signal.SIGUSR1, oldhandler)
2908
2909#
Jesse Noller6214edd2009-01-19 16:23:53 +00002910# Test to verify handle verification, see issue 3321
2911#
2912
2913class TestInvalidHandle(unittest.TestCase):
2914
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002915 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002916 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002917 conn = multiprocessing.connection.Connection(44977608)
2918 try:
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002919 self.assertRaises((ValueError, OSError), conn.poll)
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002920 finally:
2921 # Hack private attribute _handle to avoid printing an error
2922 # in conn.__del__
2923 conn._handle = None
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002924 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002925 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002926
Jesse Noller6214edd2009-01-19 16:23:53 +00002927#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002928# Functions used to create test cases from the base ones in this module
2929#
2930
Benjamin Petersone711caf2008-06-11 16:44:04 +00002931def create_test_cases(Mixin, type):
2932 result = {}
2933 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002934 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002935 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002936
2937 for name in list(glob.keys()):
2938 if name.startswith('_Test'):
2939 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002940 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002941 if type in base.ALLOWED_TYPES:
2942 newname = 'With' + Type + name[1:]
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002943 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002944 pass
2945 result[newname] = Temp
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002946 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002947 Temp.__module__ = Mixin.__module__
2948 return result
2949
2950#
2951# Create test cases
2952#
2953
2954class ProcessesMixin(object):
2955 TYPE = 'processes'
2956 Process = multiprocessing.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002957 connection = multiprocessing.connection
2958 current_process = staticmethod(multiprocessing.current_process)
2959 active_children = staticmethod(multiprocessing.active_children)
2960 Pool = staticmethod(multiprocessing.Pool)
2961 Pipe = staticmethod(multiprocessing.Pipe)
2962 Queue = staticmethod(multiprocessing.Queue)
2963 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2964 Lock = staticmethod(multiprocessing.Lock)
2965 RLock = staticmethod(multiprocessing.RLock)
2966 Semaphore = staticmethod(multiprocessing.Semaphore)
2967 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2968 Condition = staticmethod(multiprocessing.Condition)
2969 Event = staticmethod(multiprocessing.Event)
2970 Barrier = staticmethod(multiprocessing.Barrier)
2971 Value = staticmethod(multiprocessing.Value)
2972 Array = staticmethod(multiprocessing.Array)
2973 RawValue = staticmethod(multiprocessing.RawValue)
2974 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002975
2976testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2977globals().update(testcases_processes)
2978
2979
2980class ManagerMixin(object):
2981 TYPE = 'manager'
2982 Process = multiprocessing.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002983 Queue = property(operator.attrgetter('manager.Queue'))
2984 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
2985 Lock = property(operator.attrgetter('manager.Lock'))
2986 RLock = property(operator.attrgetter('manager.RLock'))
2987 Semaphore = property(operator.attrgetter('manager.Semaphore'))
2988 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
2989 Condition = property(operator.attrgetter('manager.Condition'))
2990 Event = property(operator.attrgetter('manager.Event'))
2991 Barrier = property(operator.attrgetter('manager.Barrier'))
2992 Value = property(operator.attrgetter('manager.Value'))
2993 Array = property(operator.attrgetter('manager.Array'))
2994 list = property(operator.attrgetter('manager.list'))
2995 dict = property(operator.attrgetter('manager.dict'))
2996 Namespace = property(operator.attrgetter('manager.Namespace'))
2997
2998 @classmethod
2999 def Pool(cls, *args, **kwds):
3000 return cls.manager.Pool(*args, **kwds)
3001
3002 @classmethod
3003 def setUpClass(cls):
3004 cls.manager = multiprocessing.Manager()
3005
3006 @classmethod
3007 def tearDownClass(cls):
3008 # only the manager process should be returned by active_children()
3009 # but this can take a bit on slow machines, so wait a few seconds
3010 # if there are other children too (see #17395)
3011 t = 0.01
3012 while len(multiprocessing.active_children()) > 1 and t < 5:
3013 time.sleep(t)
3014 t *= 2
3015 gc.collect() # do garbage collection
3016 if cls.manager._number_of_objects() != 0:
3017 # This is not really an error since some tests do not
3018 # ensure that all processes which hold a reference to a
3019 # managed object have been joined.
3020 print('Shared objects which still exist at manager shutdown:')
3021 print(cls.manager._debug_info())
3022 cls.manager.shutdown()
3023 cls.manager.join()
3024 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00003025
3026testcases_manager = create_test_cases(ManagerMixin, type='manager')
3027globals().update(testcases_manager)
3028
3029
3030class ThreadsMixin(object):
3031 TYPE = 'threads'
3032 Process = multiprocessing.dummy.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003033 connection = multiprocessing.dummy.connection
3034 current_process = staticmethod(multiprocessing.dummy.current_process)
3035 active_children = staticmethod(multiprocessing.dummy.active_children)
3036 Pool = staticmethod(multiprocessing.Pool)
3037 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3038 Queue = staticmethod(multiprocessing.dummy.Queue)
3039 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3040 Lock = staticmethod(multiprocessing.dummy.Lock)
3041 RLock = staticmethod(multiprocessing.dummy.RLock)
3042 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3043 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3044 Condition = staticmethod(multiprocessing.dummy.Condition)
3045 Event = staticmethod(multiprocessing.dummy.Event)
3046 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3047 Value = staticmethod(multiprocessing.dummy.Value)
3048 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003049
3050testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3051globals().update(testcases_threads)
3052
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003053
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003054class OtherTest(unittest.TestCase):
3055 # TODO: add more tests for deliver/answer challenge.
3056 def test_deliver_challenge_auth_failure(self):
3057 class _FakeConnection(object):
3058 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003059 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003060 def send_bytes(self, data):
3061 pass
3062 self.assertRaises(multiprocessing.AuthenticationError,
3063 multiprocessing.connection.deliver_challenge,
3064 _FakeConnection(), b'abc')
3065
3066 def test_answer_challenge_auth_failure(self):
3067 class _FakeConnection(object):
3068 def __init__(self):
3069 self.count = 0
3070 def recv_bytes(self, size):
3071 self.count += 1
3072 if self.count == 1:
3073 return multiprocessing.connection.CHALLENGE
3074 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003075 return b'something bogus'
3076 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003077 def send_bytes(self, data):
3078 pass
3079 self.assertRaises(multiprocessing.AuthenticationError,
3080 multiprocessing.connection.answer_challenge,
3081 _FakeConnection(), b'abc')
3082
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003083#
3084# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3085#
3086
3087def initializer(ns):
3088 ns.test += 1
3089
3090class TestInitializers(unittest.TestCase):
3091 def setUp(self):
3092 self.mgr = multiprocessing.Manager()
3093 self.ns = self.mgr.Namespace()
3094 self.ns.test = 0
3095
3096 def tearDown(self):
3097 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003098 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003099
3100 def test_manager_initializer(self):
3101 m = multiprocessing.managers.SyncManager()
3102 self.assertRaises(TypeError, m.start, 1)
3103 m.start(initializer, (self.ns,))
3104 self.assertEqual(self.ns.test, 1)
3105 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003106 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003107
3108 def test_pool_initializer(self):
3109 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3110 p = multiprocessing.Pool(1, initializer, (self.ns,))
3111 p.close()
3112 p.join()
3113 self.assertEqual(self.ns.test, 1)
3114
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003115#
3116# Issue 5155, 5313, 5331: Test process in processes
3117# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3118#
3119
3120def _ThisSubProcess(q):
3121 try:
3122 item = q.get(block=False)
3123 except pyqueue.Empty:
3124 pass
3125
3126def _TestProcess(q):
3127 queue = multiprocessing.Queue()
3128 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003129 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003130 subProc.start()
3131 subProc.join()
3132
3133def _afunc(x):
3134 return x*x
3135
3136def pool_in_process():
3137 pool = multiprocessing.Pool(processes=4)
3138 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003139 pool.close()
3140 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003141
3142class _file_like(object):
3143 def __init__(self, delegate):
3144 self._delegate = delegate
3145 self._pid = None
3146
3147 @property
3148 def cache(self):
3149 pid = os.getpid()
3150 # There are no race conditions since fork keeps only the running thread
3151 if pid != self._pid:
3152 self._pid = pid
3153 self._cache = []
3154 return self._cache
3155
3156 def write(self, data):
3157 self.cache.append(data)
3158
3159 def flush(self):
3160 self._delegate.write(''.join(self.cache))
3161 self._cache = []
3162
3163class TestStdinBadfiledescriptor(unittest.TestCase):
3164
3165 def test_queue_in_process(self):
3166 queue = multiprocessing.Queue()
3167 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3168 proc.start()
3169 proc.join()
3170
3171 def test_pool_in_process(self):
3172 p = multiprocessing.Process(target=pool_in_process)
3173 p.start()
3174 p.join()
3175
3176 def test_flushing(self):
3177 sio = io.StringIO()
3178 flike = _file_like(sio)
3179 flike.write('foo')
3180 proc = multiprocessing.Process(target=lambda: flike.flush())
3181 flike.flush()
3182 assert sio.getvalue() == 'foo'
3183
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003184
3185class TestWait(unittest.TestCase):
3186
3187 @classmethod
3188 def _child_test_wait(cls, w, slow):
3189 for i in range(10):
3190 if slow:
3191 time.sleep(random.random()*0.1)
3192 w.send((i, os.getpid()))
3193 w.close()
3194
3195 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003196 from multiprocessing.connection import wait
3197 readers = []
3198 procs = []
3199 messages = []
3200
3201 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003202 r, w = multiprocessing.Pipe(duplex=False)
3203 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003204 p.daemon = True
3205 p.start()
3206 w.close()
3207 readers.append(r)
3208 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003209 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003210
3211 while readers:
3212 for r in wait(readers):
3213 try:
3214 msg = r.recv()
3215 except EOFError:
3216 readers.remove(r)
3217 r.close()
3218 else:
3219 messages.append(msg)
3220
3221 messages.sort()
3222 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3223 self.assertEqual(messages, expected)
3224
3225 @classmethod
3226 def _child_test_wait_socket(cls, address, slow):
3227 s = socket.socket()
3228 s.connect(address)
3229 for i in range(10):
3230 if slow:
3231 time.sleep(random.random()*0.1)
3232 s.sendall(('%s\n' % i).encode('ascii'))
3233 s.close()
3234
3235 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003236 from multiprocessing.connection import wait
3237 l = socket.socket()
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02003238 l.bind((test.support.HOST, 0))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003239 l.listen(4)
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02003240 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003241 readers = []
3242 procs = []
3243 dic = {}
3244
3245 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003246 p = multiprocessing.Process(target=self._child_test_wait_socket,
3247 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003248 p.daemon = True
3249 p.start()
3250 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003251 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003252
3253 for i in range(4):
3254 r, _ = l.accept()
3255 readers.append(r)
3256 dic[r] = []
3257 l.close()
3258
3259 while readers:
3260 for r in wait(readers):
3261 msg = r.recv(32)
3262 if not msg:
3263 readers.remove(r)
3264 r.close()
3265 else:
3266 dic[r].append(msg)
3267
3268 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3269 for v in dic.values():
3270 self.assertEqual(b''.join(v), expected)
3271
3272 def test_wait_slow(self):
3273 self.test_wait(True)
3274
3275 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003276 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003277
3278 def test_wait_timeout(self):
3279 from multiprocessing.connection import wait
3280
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003281 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003282 a, b = multiprocessing.Pipe()
3283
3284 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003285 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003286 delta = time.time() - start
3287
3288 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003289 self.assertLess(delta, expected * 2)
3290 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003291
3292 b.send(None)
3293
3294 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003295 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003296 delta = time.time() - start
3297
3298 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003299 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003300
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003301 @classmethod
3302 def signal_and_sleep(cls, sem, period):
3303 sem.release()
3304 time.sleep(period)
3305
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003306 def test_wait_integer(self):
3307 from multiprocessing.connection import wait
3308
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003309 expected = 3
Giampaolo Rodola'67da8942013-01-14 02:24:25 +01003310 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003311 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003312 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003313 p = multiprocessing.Process(target=self.signal_and_sleep,
3314 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003315
3316 p.start()
3317 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003318 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003319
3320 start = time.time()
3321 res = wait([a, p.sentinel, b], expected + 20)
3322 delta = time.time() - start
3323
3324 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003325 self.assertLess(delta, expected + 2)
3326 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003327
3328 a.send(None)
3329
3330 start = time.time()
3331 res = wait([a, p.sentinel, b], 20)
3332 delta = time.time() - start
3333
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003334 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003335 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003336
3337 b.send(None)
3338
3339 start = time.time()
3340 res = wait([a, p.sentinel, b], 20)
3341 delta = time.time() - start
3342
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003343 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003344 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003345
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003346 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003347 p.join()
3348
Richard Oudkerk59d54042012-05-10 16:11:12 +01003349 def test_neg_timeout(self):
3350 from multiprocessing.connection import wait
3351 a, b = multiprocessing.Pipe()
3352 t = time.time()
3353 res = wait([a], timeout=-1)
3354 t = time.time() - t
3355 self.assertEqual(res, [])
3356 self.assertLess(t, 1)
3357 a.close()
3358 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003359
Antoine Pitrou709176f2012-04-01 17:19:09 +02003360#
3361# Issue 14151: Test invalid family on invalid environment
3362#
3363
3364class TestInvalidFamily(unittest.TestCase):
3365
3366 @unittest.skipIf(WIN32, "skipped on Windows")
3367 def test_invalid_family(self):
3368 with self.assertRaises(ValueError):
3369 multiprocessing.connection.Listener(r'\\.\test')
3370
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003371 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3372 def test_invalid_family_win32(self):
3373 with self.assertRaises(ValueError):
3374 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003375
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003376#
3377# Issue 12098: check sys.flags of child matches that for parent
3378#
3379
3380class TestFlags(unittest.TestCase):
3381 @classmethod
3382 def run_in_grandchild(cls, conn):
3383 conn.send(tuple(sys.flags))
3384
3385 @classmethod
3386 def run_in_child(cls):
3387 import json
3388 r, w = multiprocessing.Pipe(duplex=False)
3389 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3390 p.start()
3391 grandchild_flags = r.recv()
3392 p.join()
3393 r.close()
3394 w.close()
3395 flags = (tuple(sys.flags), grandchild_flags)
3396 print(json.dumps(flags))
3397
3398 def test_flags(self):
3399 import json, subprocess
3400 # start child process using unusual flags
3401 prog = ('from test.test_multiprocessing import TestFlags; ' +
3402 'TestFlags.run_in_child()')
3403 data = subprocess.check_output(
3404 [sys.executable, '-E', '-S', '-O', '-c', prog])
3405 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3406 self.assertEqual(child_flags, grandchild_flags)
3407
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003408#
3409# Test interaction with socket timeouts - see Issue #6056
3410#
3411
3412class TestTimeouts(unittest.TestCase):
3413 @classmethod
3414 def _test_timeout(cls, child, address):
3415 time.sleep(1)
3416 child.send(123)
3417 child.close()
3418 conn = multiprocessing.connection.Client(address)
3419 conn.send(456)
3420 conn.close()
3421
3422 def test_timeout(self):
3423 old_timeout = socket.getdefaulttimeout()
3424 try:
3425 socket.setdefaulttimeout(0.1)
3426 parent, child = multiprocessing.Pipe(duplex=True)
3427 l = multiprocessing.connection.Listener(family='AF_INET')
3428 p = multiprocessing.Process(target=self._test_timeout,
3429 args=(child, l.address))
3430 p.start()
3431 child.close()
3432 self.assertEqual(parent.recv(), 123)
3433 parent.close()
3434 conn = l.accept()
3435 self.assertEqual(conn.recv(), 456)
3436 conn.close()
3437 l.close()
3438 p.join(10)
3439 finally:
3440 socket.setdefaulttimeout(old_timeout)
3441
Richard Oudkerke88a2442012-08-14 11:41:32 +01003442#
3443# Test what happens with no "if __name__ == '__main__'"
3444#
3445
3446class TestNoForkBomb(unittest.TestCase):
3447 def test_noforkbomb(self):
3448 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3449 if WIN32:
3450 rc, out, err = test.script_helper.assert_python_failure(name)
3451 self.assertEqual('', out.decode('ascii'))
3452 self.assertIn('RuntimeError', err.decode('ascii'))
3453 else:
3454 rc, out, err = test.script_helper.assert_python_ok(name)
3455 self.assertEqual('123', out.decode('ascii').rstrip())
3456 self.assertEqual('', err.decode('ascii'))
3457
3458#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003459# Issue #17555: ForkAwareThreadLock
3460#
3461
3462class TestForkAwareThreadLock(unittest.TestCase):
3463 # We recurisvely start processes. Issue #17555 meant that the
3464 # after fork registry would get duplicate entries for the same
3465 # lock. The size of the registry at generation n was ~2**n.
3466
3467 @classmethod
3468 def child(cls, n, conn):
3469 if n > 1:
3470 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3471 p.start()
3472 p.join()
3473 else:
3474 conn.send(len(util._afterfork_registry))
3475 conn.close()
3476
3477 def test_lock(self):
3478 r, w = multiprocessing.Pipe(False)
3479 l = util.ForkAwareThreadLock()
3480 old_size = len(util._afterfork_registry)
3481 p = multiprocessing.Process(target=self.child, args=(5, w))
3482 p.start()
3483 new_size = r.recv()
3484 p.join()
3485 self.assertLessEqual(new_size, old_size)
3486
3487#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003488# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3489#
3490
3491class TestIgnoreEINTR(unittest.TestCase):
3492
3493 @classmethod
3494 def _test_ignore(cls, conn):
3495 def handler(signum, frame):
3496 pass
3497 signal.signal(signal.SIGUSR1, handler)
3498 conn.send('ready')
3499 x = conn.recv()
3500 conn.send(x)
3501 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3502
3503 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3504 def test_ignore(self):
3505 conn, child_conn = multiprocessing.Pipe()
3506 try:
3507 p = multiprocessing.Process(target=self._test_ignore,
3508 args=(child_conn,))
3509 p.daemon = True
3510 p.start()
3511 child_conn.close()
3512 self.assertEqual(conn.recv(), 'ready')
3513 time.sleep(0.1)
3514 os.kill(p.pid, signal.SIGUSR1)
3515 time.sleep(0.1)
3516 conn.send(1234)
3517 self.assertEqual(conn.recv(), 1234)
3518 time.sleep(0.1)
3519 os.kill(p.pid, signal.SIGUSR1)
3520 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3521 time.sleep(0.1)
3522 p.join()
3523 finally:
3524 conn.close()
3525
3526 @classmethod
3527 def _test_ignore_listener(cls, conn):
3528 def handler(signum, frame):
3529 pass
3530 signal.signal(signal.SIGUSR1, handler)
3531 l = multiprocessing.connection.Listener()
3532 conn.send(l.address)
3533 a = l.accept()
3534 a.send('welcome')
3535
3536 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3537 def test_ignore_listener(self):
3538 conn, child_conn = multiprocessing.Pipe()
3539 try:
3540 p = multiprocessing.Process(target=self._test_ignore_listener,
3541 args=(child_conn,))
3542 p.daemon = True
3543 p.start()
3544 child_conn.close()
3545 address = conn.recv()
3546 time.sleep(0.1)
3547 os.kill(p.pid, signal.SIGUSR1)
3548 time.sleep(0.1)
3549 client = multiprocessing.connection.Client(address)
3550 self.assertEqual(client.recv(), 'welcome')
3551 p.join()
3552 finally:
3553 conn.close()
3554
3555#
Richard Oudkerke88a2442012-08-14 11:41:32 +01003556#
3557#
3558
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003559def setUpModule():
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003560 if sys.platform.startswith("linux"):
3561 try:
3562 lock = multiprocessing.RLock()
3563 except OSError:
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003564 raise unittest.SkipTest("OSError raises on RLock creation, "
3565 "see issue 3111!")
Charles-François Natali221ef672011-11-22 18:55:22 +01003566 check_enough_semaphores()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003567 util.get_temp_dir() # creates temp directory for use by all processes
Benjamin Petersone711caf2008-06-11 16:44:04 +00003568 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3569
Benjamin Petersone711caf2008-06-11 16:44:04 +00003570
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003571def tearDownModule():
3572 # pause a bit so we don't get warning about dangling threads/processes
3573 time.sleep(0.5)
3574
3575
Benjamin Petersone711caf2008-06-11 16:44:04 +00003576if __name__ == '__main__':
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003577 unittest.main()