blob: f6f4f73952dd79a95e6cba22565769fe247b37a7 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
R. David Murraya21e4ca2009-03-31 23:16:50 +000022import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000023
Benjamin Petersone5384b02008-10-04 22:00:42 +000024
R. David Murraya21e4ca2009-03-31 23:16:50 +000025# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000029# import threading after _multiprocessing to raise a more revelant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000033
Benjamin Petersone711caf2008-06-11 16:44:04 +000034import multiprocessing.dummy
35import multiprocessing.connection
36import multiprocessing.managers
37import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Charles-François Natalibc8f0822011-09-20 20:36:51 +020040from multiprocessing import util
41
42try:
43 from multiprocessing import reduction
44 HAS_REDUCTION = True
45except ImportError:
46 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
Brian Curtinafa88b52010-10-07 01:12:19 +000048try:
49 from multiprocessing.sharedctypes import Value, copy
50 HAS_SHAREDCTYPES = True
51except ImportError:
52 HAS_SHAREDCTYPES = False
53
Antoine Pitroubcb39d42011-08-23 19:46:22 +020054try:
55 import msvcrt
56except ImportError:
57 msvcrt = None
58
Benjamin Petersone711caf2008-06-11 16:44:04 +000059#
60#
61#
62
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000063def latin(s):
64 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000065
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Constants
68#
69
70LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000071#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000072
73DELTA = 0.1
74CHECK_TIMINGS = False # making true makes tests take a lot longer
75 # and can sometimes cause some non-serious
76 # failures because some calls block a bit
77 # longer than expected
78if CHECK_TIMINGS:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
80else:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
82
83HAVE_GETVALUE = not getattr(_multiprocessing,
84 'HAVE_BROKEN_SEM_GETVALUE', False)
85
Jesse Noller6214edd2009-01-19 16:23:53 +000086WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020087
Richard Oudkerk59d54042012-05-10 16:11:12 +010088from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090def wait_for_handle(handle, timeout):
91 if timeout is not None and timeout < 0.0:
92 timeout = None
93 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000094
Antoine Pitroubcb39d42011-08-23 19:46:22 +020095try:
96 MAXFD = os.sysconf("SC_OPEN_MAX")
97except:
98 MAXFD = 256
99
Benjamin Petersone711caf2008-06-11 16:44:04 +0000100#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000101# Some tests require ctypes
102#
103
104try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000105 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000106except ImportError:
107 Structure = object
108 c_int = c_double = None
109
Charles-François Natali221ef672011-11-22 18:55:22 +0100110
111def check_enough_semaphores():
112 """Check that the system supports enough semaphores to run the test."""
113 # minimum number of semaphores available according to POSIX
114 nsems_min = 256
115 try:
116 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
117 except (AttributeError, ValueError):
118 # sysconf not available or setting not available
119 return
120 if nsems == -1 or nsems >= nsems_min:
121 return
122 raise unittest.SkipTest("The OS doesn't support enough semaphores "
123 "to run the test (required: %d)." % nsems_min)
124
125
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000126#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127# Creates a wrapper for a function which records the time it takes to finish
128#
129
130class TimingWrapper(object):
131
132 def __init__(self, func):
133 self.func = func
134 self.elapsed = None
135
136 def __call__(self, *args, **kwds):
137 t = time.time()
138 try:
139 return self.func(*args, **kwds)
140 finally:
141 self.elapsed = time.time() - t
142
143#
144# Base class for test cases
145#
146
147class BaseTestCase(object):
148
149 ALLOWED_TYPES = ('processes', 'manager', 'threads')
150
151 def assertTimingAlmostEqual(self, a, b):
152 if CHECK_TIMINGS:
153 self.assertAlmostEqual(a, b, 1)
154
155 def assertReturnsIfImplemented(self, value, func, *args):
156 try:
157 res = func(*args)
158 except NotImplementedError:
159 pass
160 else:
161 return self.assertEqual(value, res)
162
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000163 # For the sanity of Windows users, rather than crashing or freezing in
164 # multiple ways.
165 def __reduce__(self, *args):
166 raise NotImplementedError("shouldn't try to pickle a test case")
167
168 __reduce_ex__ = __reduce__
169
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170#
171# Return the value of a semaphore
172#
173
174def get_value(self):
175 try:
176 return self.get_value()
177 except AttributeError:
178 try:
179 return self._Semaphore__value
180 except AttributeError:
181 try:
182 return self._value
183 except AttributeError:
184 raise NotImplementedError
185
186#
187# Testcases
188#
189
190class _TestProcess(BaseTestCase):
191
192 ALLOWED_TYPES = ('processes', 'threads')
193
194 def test_current(self):
195 if self.TYPE == 'threads':
196 return
197
198 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000199 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200
201 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000203 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 self.assertEqual(current.ident, os.getpid())
206 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000208 def test_daemon_argument(self):
209 if self.TYPE == "threads":
210 return
211
212 # By default uses the current process's daemon flag.
213 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000214 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000215 proc1 = self.Process(target=self._test, daemon=True)
216 self.assertTrue(proc1.daemon)
217 proc2 = self.Process(target=self._test, daemon=False)
218 self.assertFalse(proc2.daemon)
219
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000220 @classmethod
221 def _test(cls, q, *args, **kwds):
222 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 q.put(args)
224 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000226 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228 q.put(current.pid)
229
230 def test_process(self):
231 q = self.Queue(1)
232 e = self.Event()
233 args = (q, 1, 2)
234 kwargs = {'hello':23, 'bye':2.54}
235 name = 'SomeProcess'
236 p = self.Process(
237 target=self._test, args=args, kwargs=kwargs, name=name
238 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000239 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240 current = self.current_process()
241
242 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000243 self.assertEqual(p.authkey, current.authkey)
244 self.assertEqual(p.is_alive(), False)
245 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
250 p.start()
251
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.exitcode, None)
253 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000254 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(q.get(), args[1:])
257 self.assertEqual(q.get(), kwargs)
258 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000260 self.assertEqual(q.get(), current.authkey)
261 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 p.join()
264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(p.exitcode, 0)
266 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000267 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000269 @classmethod
270 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 time.sleep(1000)
272
273 def test_terminate(self):
274 if self.TYPE == 'threads':
275 return
276
277 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000278 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 p.start()
280
281 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000282 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000283 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000284
Richard Oudkerk59d54042012-05-10 16:11:12 +0100285 join = TimingWrapper(p.join)
286
287 self.assertEqual(join(0), None)
288 self.assertTimingAlmostEqual(join.elapsed, 0.0)
289 self.assertEqual(p.is_alive(), True)
290
291 self.assertEqual(join(-1), None)
292 self.assertTimingAlmostEqual(join.elapsed, 0.0)
293 self.assertEqual(p.is_alive(), True)
294
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295 p.terminate()
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 self.assertEqual(join(), None)
298 self.assertTimingAlmostEqual(join.elapsed, 0.0)
299
300 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
304
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000305 # XXX sometimes get p.exitcode == 0 on Windows ...
306 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307
308 def test_cpu_count(self):
309 try:
310 cpus = multiprocessing.cpu_count()
311 except NotImplementedError:
312 cpus = 1
313 self.assertTrue(type(cpus) is int)
314 self.assertTrue(cpus >= 1)
315
316 def test_active_children(self):
317 self.assertEqual(type(self.active_children()), list)
318
319 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000320 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
Jesus Cea94f964f2011-09-09 20:26:57 +0200322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000324 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000327 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000329 @classmethod
330 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 from multiprocessing import forking
332 wconn.send(id)
333 if len(id) < 2:
334 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000335 p = cls.Process(
336 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 )
338 p.start()
339 p.join()
340
341 def test_recursion(self):
342 rconn, wconn = self.Pipe(duplex=False)
343 self._test_recursion(wconn, [])
344
345 time.sleep(DELTA)
346 result = []
347 while rconn.poll():
348 result.append(rconn.recv())
349
350 expected = [
351 [],
352 [0],
353 [0, 0],
354 [0, 1],
355 [1],
356 [1, 0],
357 [1, 1]
358 ]
359 self.assertEqual(result, expected)
360
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200361 @classmethod
362 def _test_sentinel(cls, event):
363 event.wait(10.0)
364
365 def test_sentinel(self):
366 if self.TYPE == "threads":
367 return
368 event = self.Event()
369 p = self.Process(target=self._test_sentinel, args=(event,))
370 with self.assertRaises(ValueError):
371 p.sentinel
372 p.start()
373 self.addCleanup(p.join)
374 sentinel = p.sentinel
375 self.assertIsInstance(sentinel, int)
376 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
377 event.set()
378 p.join()
379 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
380
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381#
382#
383#
384
385class _UpperCaser(multiprocessing.Process):
386
387 def __init__(self):
388 multiprocessing.Process.__init__(self)
389 self.child_conn, self.parent_conn = multiprocessing.Pipe()
390
391 def run(self):
392 self.parent_conn.close()
393 for s in iter(self.child_conn.recv, None):
394 self.child_conn.send(s.upper())
395 self.child_conn.close()
396
397 def submit(self, s):
398 assert type(s) is str
399 self.parent_conn.send(s)
400 return self.parent_conn.recv()
401
402 def stop(self):
403 self.parent_conn.send(None)
404 self.parent_conn.close()
405 self.child_conn.close()
406
407class _TestSubclassingProcess(BaseTestCase):
408
409 ALLOWED_TYPES = ('processes',)
410
411 def test_subclassing(self):
412 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200413 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 uppercaser.start()
415 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
416 self.assertEqual(uppercaser.submit('world'), 'WORLD')
417 uppercaser.stop()
418 uppercaser.join()
419
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100420 def test_stderr_flush(self):
421 # sys.stderr is flushed at process shutdown (issue #13812)
422 if self.TYPE == "threads":
423 return
424
425 testfn = test.support.TESTFN
426 self.addCleanup(test.support.unlink, testfn)
427 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
428 proc.start()
429 proc.join()
430 with open(testfn, 'r') as f:
431 err = f.read()
432 # The whole traceback was printed
433 self.assertIn("ZeroDivisionError", err)
434 self.assertIn("test_multiprocessing.py", err)
435 self.assertIn("1/0 # MARKER", err)
436
437 @classmethod
438 def _test_stderr_flush(cls, testfn):
439 sys.stderr = open(testfn, 'w')
440 1/0 # MARKER
441
442
Richard Oudkerk29471de2012-06-06 19:04:57 +0100443 @classmethod
444 def _test_sys_exit(cls, reason, testfn):
445 sys.stderr = open(testfn, 'w')
446 sys.exit(reason)
447
448 def test_sys_exit(self):
449 # See Issue 13854
450 if self.TYPE == 'threads':
451 return
452
453 testfn = test.support.TESTFN
454 self.addCleanup(test.support.unlink, testfn)
455
456 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
457 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
458 p.daemon = True
459 p.start()
460 p.join(5)
461 self.assertEqual(p.exitcode, code)
462
463 with open(testfn, 'r') as f:
464 self.assertEqual(f.read().rstrip(), str(reason))
465
466 for reason in (True, False, 8):
467 p = self.Process(target=sys.exit, args=(reason,))
468 p.daemon = True
469 p.start()
470 p.join(5)
471 self.assertEqual(p.exitcode, reason)
472
Benjamin Petersone711caf2008-06-11 16:44:04 +0000473#
474#
475#
476
477def queue_empty(q):
478 if hasattr(q, 'empty'):
479 return q.empty()
480 else:
481 return q.qsize() == 0
482
483def queue_full(q, maxsize):
484 if hasattr(q, 'full'):
485 return q.full()
486 else:
487 return q.qsize() == maxsize
488
489
490class _TestQueue(BaseTestCase):
491
492
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000493 @classmethod
494 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000495 child_can_start.wait()
496 for i in range(6):
497 queue.get()
498 parent_can_continue.set()
499
500 def test_put(self):
501 MAXSIZE = 6
502 queue = self.Queue(maxsize=MAXSIZE)
503 child_can_start = self.Event()
504 parent_can_continue = self.Event()
505
506 proc = self.Process(
507 target=self._test_put,
508 args=(queue, child_can_start, parent_can_continue)
509 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000510 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 proc.start()
512
513 self.assertEqual(queue_empty(queue), True)
514 self.assertEqual(queue_full(queue, MAXSIZE), False)
515
516 queue.put(1)
517 queue.put(2, True)
518 queue.put(3, True, None)
519 queue.put(4, False)
520 queue.put(5, False, None)
521 queue.put_nowait(6)
522
523 # the values may be in buffer but not yet in pipe so sleep a bit
524 time.sleep(DELTA)
525
526 self.assertEqual(queue_empty(queue), False)
527 self.assertEqual(queue_full(queue, MAXSIZE), True)
528
529 put = TimingWrapper(queue.put)
530 put_nowait = TimingWrapper(queue.put_nowait)
531
532 self.assertRaises(pyqueue.Full, put, 7, False)
533 self.assertTimingAlmostEqual(put.elapsed, 0)
534
535 self.assertRaises(pyqueue.Full, put, 7, False, None)
536 self.assertTimingAlmostEqual(put.elapsed, 0)
537
538 self.assertRaises(pyqueue.Full, put_nowait, 7)
539 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
540
541 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
542 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
543
544 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
545 self.assertTimingAlmostEqual(put.elapsed, 0)
546
547 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
548 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
549
550 child_can_start.set()
551 parent_can_continue.wait()
552
553 self.assertEqual(queue_empty(queue), True)
554 self.assertEqual(queue_full(queue, MAXSIZE), False)
555
556 proc.join()
557
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000558 @classmethod
559 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000561 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 queue.put(2)
563 queue.put(3)
564 queue.put(4)
565 queue.put(5)
566 parent_can_continue.set()
567
568 def test_get(self):
569 queue = self.Queue()
570 child_can_start = self.Event()
571 parent_can_continue = self.Event()
572
573 proc = self.Process(
574 target=self._test_get,
575 args=(queue, child_can_start, parent_can_continue)
576 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000577 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000578 proc.start()
579
580 self.assertEqual(queue_empty(queue), True)
581
582 child_can_start.set()
583 parent_can_continue.wait()
584
585 time.sleep(DELTA)
586 self.assertEqual(queue_empty(queue), False)
587
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000588 # Hangs unexpectedly, remove for now
589 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000590 self.assertEqual(queue.get(True, None), 2)
591 self.assertEqual(queue.get(True), 3)
592 self.assertEqual(queue.get(timeout=1), 4)
593 self.assertEqual(queue.get_nowait(), 5)
594
595 self.assertEqual(queue_empty(queue), True)
596
597 get = TimingWrapper(queue.get)
598 get_nowait = TimingWrapper(queue.get_nowait)
599
600 self.assertRaises(pyqueue.Empty, get, False)
601 self.assertTimingAlmostEqual(get.elapsed, 0)
602
603 self.assertRaises(pyqueue.Empty, get, False, None)
604 self.assertTimingAlmostEqual(get.elapsed, 0)
605
606 self.assertRaises(pyqueue.Empty, get_nowait)
607 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
608
609 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
610 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
611
612 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
613 self.assertTimingAlmostEqual(get.elapsed, 0)
614
615 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
616 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
617
618 proc.join()
619
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000620 @classmethod
621 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000622 for i in range(10, 20):
623 queue.put(i)
624 # note that at this point the items may only be buffered, so the
625 # process cannot shutdown until the feeder thread has finished
626 # pushing items onto the pipe.
627
628 def test_fork(self):
629 # Old versions of Queue would fail to create a new feeder
630 # thread for a forked process if the original process had its
631 # own feeder thread. This test checks that this no longer
632 # happens.
633
634 queue = self.Queue()
635
636 # put items on queue so that main process starts a feeder thread
637 for i in range(10):
638 queue.put(i)
639
640 # wait to make sure thread starts before we fork a new process
641 time.sleep(DELTA)
642
643 # fork process
644 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200645 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000646 p.start()
647
648 # check that all expected items are in the queue
649 for i in range(20):
650 self.assertEqual(queue.get(), i)
651 self.assertRaises(pyqueue.Empty, queue.get, False)
652
653 p.join()
654
655 def test_qsize(self):
656 q = self.Queue()
657 try:
658 self.assertEqual(q.qsize(), 0)
659 except NotImplementedError:
660 return
661 q.put(1)
662 self.assertEqual(q.qsize(), 1)
663 q.put(5)
664 self.assertEqual(q.qsize(), 2)
665 q.get()
666 self.assertEqual(q.qsize(), 1)
667 q.get()
668 self.assertEqual(q.qsize(), 0)
669
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000670 @classmethod
671 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000672 for obj in iter(q.get, None):
673 time.sleep(DELTA)
674 q.task_done()
675
676 def test_task_done(self):
677 queue = self.JoinableQueue()
678
679 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000680 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000681
682 workers = [self.Process(target=self._test_task_done, args=(queue,))
683 for i in range(4)]
684
685 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200686 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000687 p.start()
688
689 for i in range(10):
690 queue.put(i)
691
692 queue.join()
693
694 for p in workers:
695 queue.put(None)
696
697 for p in workers:
698 p.join()
699
700#
701#
702#
703
704class _TestLock(BaseTestCase):
705
706 def test_lock(self):
707 lock = self.Lock()
708 self.assertEqual(lock.acquire(), True)
709 self.assertEqual(lock.acquire(False), False)
710 self.assertEqual(lock.release(), None)
711 self.assertRaises((ValueError, threading.ThreadError), lock.release)
712
713 def test_rlock(self):
714 lock = self.RLock()
715 self.assertEqual(lock.acquire(), True)
716 self.assertEqual(lock.acquire(), True)
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.release(), None)
719 self.assertEqual(lock.release(), None)
720 self.assertEqual(lock.release(), None)
721 self.assertRaises((AssertionError, RuntimeError), lock.release)
722
Jesse Nollerf8d00852009-03-31 03:25:07 +0000723 def test_lock_context(self):
724 with self.Lock():
725 pass
726
Benjamin Petersone711caf2008-06-11 16:44:04 +0000727
728class _TestSemaphore(BaseTestCase):
729
730 def _test_semaphore(self, sem):
731 self.assertReturnsIfImplemented(2, get_value, sem)
732 self.assertEqual(sem.acquire(), True)
733 self.assertReturnsIfImplemented(1, get_value, sem)
734 self.assertEqual(sem.acquire(), True)
735 self.assertReturnsIfImplemented(0, get_value, sem)
736 self.assertEqual(sem.acquire(False), False)
737 self.assertReturnsIfImplemented(0, get_value, sem)
738 self.assertEqual(sem.release(), None)
739 self.assertReturnsIfImplemented(1, get_value, sem)
740 self.assertEqual(sem.release(), None)
741 self.assertReturnsIfImplemented(2, get_value, sem)
742
743 def test_semaphore(self):
744 sem = self.Semaphore(2)
745 self._test_semaphore(sem)
746 self.assertEqual(sem.release(), None)
747 self.assertReturnsIfImplemented(3, get_value, sem)
748 self.assertEqual(sem.release(), None)
749 self.assertReturnsIfImplemented(4, get_value, sem)
750
751 def test_bounded_semaphore(self):
752 sem = self.BoundedSemaphore(2)
753 self._test_semaphore(sem)
754 # Currently fails on OS/X
755 #if HAVE_GETVALUE:
756 # self.assertRaises(ValueError, sem.release)
757 # self.assertReturnsIfImplemented(2, get_value, sem)
758
759 def test_timeout(self):
760 if self.TYPE != 'processes':
761 return
762
763 sem = self.Semaphore(0)
764 acquire = TimingWrapper(sem.acquire)
765
766 self.assertEqual(acquire(False), False)
767 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
768
769 self.assertEqual(acquire(False, None), False)
770 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
771
772 self.assertEqual(acquire(False, TIMEOUT1), False)
773 self.assertTimingAlmostEqual(acquire.elapsed, 0)
774
775 self.assertEqual(acquire(True, TIMEOUT2), False)
776 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
777
778 self.assertEqual(acquire(timeout=TIMEOUT3), False)
779 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
780
781
782class _TestCondition(BaseTestCase):
783
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000784 @classmethod
785 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 cond.acquire()
787 sleeping.release()
788 cond.wait(timeout)
789 woken.release()
790 cond.release()
791
792 def check_invariant(self, cond):
793 # this is only supposed to succeed when there are no sleepers
794 if self.TYPE == 'processes':
795 try:
796 sleepers = (cond._sleeping_count.get_value() -
797 cond._woken_count.get_value())
798 self.assertEqual(sleepers, 0)
799 self.assertEqual(cond._wait_semaphore.get_value(), 0)
800 except NotImplementedError:
801 pass
802
803 def test_notify(self):
804 cond = self.Condition()
805 sleeping = self.Semaphore(0)
806 woken = self.Semaphore(0)
807
808 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000809 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810 p.start()
811
812 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000813 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 p.start()
815
816 # wait for both children to start sleeping
817 sleeping.acquire()
818 sleeping.acquire()
819
820 # check no process/thread has woken up
821 time.sleep(DELTA)
822 self.assertReturnsIfImplemented(0, get_value, woken)
823
824 # wake up one process/thread
825 cond.acquire()
826 cond.notify()
827 cond.release()
828
829 # check one process/thread has woken up
830 time.sleep(DELTA)
831 self.assertReturnsIfImplemented(1, get_value, woken)
832
833 # wake up another
834 cond.acquire()
835 cond.notify()
836 cond.release()
837
838 # check other has woken up
839 time.sleep(DELTA)
840 self.assertReturnsIfImplemented(2, get_value, woken)
841
842 # check state is not mucked up
843 self.check_invariant(cond)
844 p.join()
845
846 def test_notify_all(self):
847 cond = self.Condition()
848 sleeping = self.Semaphore(0)
849 woken = self.Semaphore(0)
850
851 # start some threads/processes which will timeout
852 for i in range(3):
853 p = self.Process(target=self.f,
854 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000855 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000856 p.start()
857
858 t = threading.Thread(target=self.f,
859 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000860 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 t.start()
862
863 # wait for them all to sleep
864 for i in range(6):
865 sleeping.acquire()
866
867 # check they have all timed out
868 for i in range(6):
869 woken.acquire()
870 self.assertReturnsIfImplemented(0, get_value, woken)
871
872 # check state is not mucked up
873 self.check_invariant(cond)
874
875 # start some more threads/processes
876 for i in range(3):
877 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000878 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000879 p.start()
880
881 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000882 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883 t.start()
884
885 # wait for them to all sleep
886 for i in range(6):
887 sleeping.acquire()
888
889 # check no process/thread has woken up
890 time.sleep(DELTA)
891 self.assertReturnsIfImplemented(0, get_value, woken)
892
893 # wake them all up
894 cond.acquire()
895 cond.notify_all()
896 cond.release()
897
898 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200899 for i in range(10):
900 try:
901 if get_value(woken) == 6:
902 break
903 except NotImplementedError:
904 break
905 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906 self.assertReturnsIfImplemented(6, get_value, woken)
907
908 # check state is not mucked up
909 self.check_invariant(cond)
910
911 def test_timeout(self):
912 cond = self.Condition()
913 wait = TimingWrapper(cond.wait)
914 cond.acquire()
915 res = wait(TIMEOUT1)
916 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000917 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
919
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200920 @classmethod
921 def _test_waitfor_f(cls, cond, state):
922 with cond:
923 state.value = 0
924 cond.notify()
925 result = cond.wait_for(lambda : state.value==4)
926 if not result or state.value != 4:
927 sys.exit(1)
928
929 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
930 def test_waitfor(self):
931 # based on test in test/lock_tests.py
932 cond = self.Condition()
933 state = self.Value('i', -1)
934
935 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
936 p.daemon = True
937 p.start()
938
939 with cond:
940 result = cond.wait_for(lambda : state.value==0)
941 self.assertTrue(result)
942 self.assertEqual(state.value, 0)
943
944 for i in range(4):
945 time.sleep(0.01)
946 with cond:
947 state.value += 1
948 cond.notify()
949
950 p.join(5)
951 self.assertFalse(p.is_alive())
952 self.assertEqual(p.exitcode, 0)
953
954 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100955 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
956 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200957 with cond:
958 expected = 0.1
959 dt = time.time()
960 result = cond.wait_for(lambda : state.value==4, timeout=expected)
961 dt = time.time() - dt
962 # borrow logic in assertTimeout() from test/lock_tests.py
963 if not result and expected * 0.6 < dt < expected * 10.0:
964 success.value = True
965
966 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
967 def test_waitfor_timeout(self):
968 # based on test in test/lock_tests.py
969 cond = self.Condition()
970 state = self.Value('i', 0)
971 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100972 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200973
974 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100975 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200976 p.daemon = True
977 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100978 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200979
980 # Only increment 3 times, so state == 4 is never reached.
981 for i in range(3):
982 time.sleep(0.01)
983 with cond:
984 state.value += 1
985 cond.notify()
986
987 p.join(5)
988 self.assertTrue(success.value)
989
Richard Oudkerk98449932012-06-05 13:15:29 +0100990 @classmethod
991 def _test_wait_result(cls, c, pid):
992 with c:
993 c.notify()
994 time.sleep(1)
995 if pid is not None:
996 os.kill(pid, signal.SIGINT)
997
998 def test_wait_result(self):
999 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1000 pid = os.getpid()
1001 else:
1002 pid = None
1003
1004 c = self.Condition()
1005 with c:
1006 self.assertFalse(c.wait(0))
1007 self.assertFalse(c.wait(0.1))
1008
1009 p = self.Process(target=self._test_wait_result, args=(c, pid))
1010 p.start()
1011
1012 self.assertTrue(c.wait(10))
1013 if pid is not None:
1014 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1015
1016 p.join()
1017
Benjamin Petersone711caf2008-06-11 16:44:04 +00001018
1019class _TestEvent(BaseTestCase):
1020
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001021 @classmethod
1022 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001023 time.sleep(TIMEOUT2)
1024 event.set()
1025
1026 def test_event(self):
1027 event = self.Event()
1028 wait = TimingWrapper(event.wait)
1029
Ezio Melotti13925002011-03-16 11:05:33 +02001030 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001031 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001032 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033
Benjamin Peterson965ce872009-04-05 21:24:58 +00001034 # Removed, threading.Event.wait() will return the value of the __flag
1035 # instead of None. API Shear with the semaphore backed mp.Event
1036 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001038 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1040
1041 event.set()
1042
1043 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001044 self.assertEqual(event.is_set(), True)
1045 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001046 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001047 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1049 # self.assertEqual(event.is_set(), True)
1050
1051 event.clear()
1052
1053 #self.assertEqual(event.is_set(), False)
1054
Jesus Cea94f964f2011-09-09 20:26:57 +02001055 p = self.Process(target=self._test_event, args=(event,))
1056 p.daemon = True
1057 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001058 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059
1060#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001061# Tests for Barrier - adapted from tests in test/lock_tests.py
1062#
1063
1064# Many of the tests for threading.Barrier use a list as an atomic
1065# counter: a value is appended to increment the counter, and the
1066# length of the list gives the value. We use the class DummyList
1067# for the same purpose.
1068
1069class _DummyList(object):
1070
1071 def __init__(self):
1072 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1073 lock = multiprocessing.Lock()
1074 self.__setstate__((wrapper, lock))
1075 self._lengthbuf[0] = 0
1076
1077 def __setstate__(self, state):
1078 (self._wrapper, self._lock) = state
1079 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1080
1081 def __getstate__(self):
1082 return (self._wrapper, self._lock)
1083
1084 def append(self, _):
1085 with self._lock:
1086 self._lengthbuf[0] += 1
1087
1088 def __len__(self):
1089 with self._lock:
1090 return self._lengthbuf[0]
1091
1092def _wait():
1093 # A crude wait/yield function not relying on synchronization primitives.
1094 time.sleep(0.01)
1095
1096
1097class Bunch(object):
1098 """
1099 A bunch of threads.
1100 """
1101 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1102 """
1103 Construct a bunch of `n` threads running the same function `f`.
1104 If `wait_before_exit` is True, the threads won't terminate until
1105 do_finish() is called.
1106 """
1107 self.f = f
1108 self.args = args
1109 self.n = n
1110 self.started = namespace.DummyList()
1111 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001112 self._can_exit = namespace.Event()
1113 if not wait_before_exit:
1114 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001115 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001116 p = namespace.Process(target=self.task)
1117 p.daemon = True
1118 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001119
1120 def task(self):
1121 pid = os.getpid()
1122 self.started.append(pid)
1123 try:
1124 self.f(*self.args)
1125 finally:
1126 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001127 self._can_exit.wait(30)
1128 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001129
1130 def wait_for_started(self):
1131 while len(self.started) < self.n:
1132 _wait()
1133
1134 def wait_for_finished(self):
1135 while len(self.finished) < self.n:
1136 _wait()
1137
1138 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001139 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001140
1141
1142class AppendTrue(object):
1143 def __init__(self, obj):
1144 self.obj = obj
1145 def __call__(self):
1146 self.obj.append(True)
1147
1148
1149class _TestBarrier(BaseTestCase):
1150 """
1151 Tests for Barrier objects.
1152 """
1153 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001154 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001155
1156 def setUp(self):
1157 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1158
1159 def tearDown(self):
1160 self.barrier.abort()
1161 self.barrier = None
1162
1163 def DummyList(self):
1164 if self.TYPE == 'threads':
1165 return []
1166 elif self.TYPE == 'manager':
1167 return self.manager.list()
1168 else:
1169 return _DummyList()
1170
1171 def run_threads(self, f, args):
1172 b = Bunch(self, f, args, self.N-1)
1173 f(*args)
1174 b.wait_for_finished()
1175
1176 @classmethod
1177 def multipass(cls, barrier, results, n):
1178 m = barrier.parties
1179 assert m == cls.N
1180 for i in range(n):
1181 results[0].append(True)
1182 assert len(results[1]) == i * m
1183 barrier.wait()
1184 results[1].append(True)
1185 assert len(results[0]) == (i + 1) * m
1186 barrier.wait()
1187 try:
1188 assert barrier.n_waiting == 0
1189 except NotImplementedError:
1190 pass
1191 assert not barrier.broken
1192
1193 def test_barrier(self, passes=1):
1194 """
1195 Test that a barrier is passed in lockstep
1196 """
1197 results = [self.DummyList(), self.DummyList()]
1198 self.run_threads(self.multipass, (self.barrier, results, passes))
1199
1200 def test_barrier_10(self):
1201 """
1202 Test that a barrier works for 10 consecutive runs
1203 """
1204 return self.test_barrier(10)
1205
1206 @classmethod
1207 def _test_wait_return_f(cls, barrier, queue):
1208 res = barrier.wait()
1209 queue.put(res)
1210
1211 def test_wait_return(self):
1212 """
1213 test the return value from barrier.wait
1214 """
1215 queue = self.Queue()
1216 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1217 results = [queue.get() for i in range(self.N)]
1218 self.assertEqual(results.count(0), 1)
1219
1220 @classmethod
1221 def _test_action_f(cls, barrier, results):
1222 barrier.wait()
1223 if len(results) != 1:
1224 raise RuntimeError
1225
1226 def test_action(self):
1227 """
1228 Test the 'action' callback
1229 """
1230 results = self.DummyList()
1231 barrier = self.Barrier(self.N, action=AppendTrue(results))
1232 self.run_threads(self._test_action_f, (barrier, results))
1233 self.assertEqual(len(results), 1)
1234
1235 @classmethod
1236 def _test_abort_f(cls, barrier, results1, results2):
1237 try:
1238 i = barrier.wait()
1239 if i == cls.N//2:
1240 raise RuntimeError
1241 barrier.wait()
1242 results1.append(True)
1243 except threading.BrokenBarrierError:
1244 results2.append(True)
1245 except RuntimeError:
1246 barrier.abort()
1247
1248 def test_abort(self):
1249 """
1250 Test that an abort will put the barrier in a broken state
1251 """
1252 results1 = self.DummyList()
1253 results2 = self.DummyList()
1254 self.run_threads(self._test_abort_f,
1255 (self.barrier, results1, results2))
1256 self.assertEqual(len(results1), 0)
1257 self.assertEqual(len(results2), self.N-1)
1258 self.assertTrue(self.barrier.broken)
1259
1260 @classmethod
1261 def _test_reset_f(cls, barrier, results1, results2, results3):
1262 i = barrier.wait()
1263 if i == cls.N//2:
1264 # Wait until the other threads are all in the barrier.
1265 while barrier.n_waiting < cls.N-1:
1266 time.sleep(0.001)
1267 barrier.reset()
1268 else:
1269 try:
1270 barrier.wait()
1271 results1.append(True)
1272 except threading.BrokenBarrierError:
1273 results2.append(True)
1274 # Now, pass the barrier again
1275 barrier.wait()
1276 results3.append(True)
1277
1278 def test_reset(self):
1279 """
1280 Test that a 'reset' on a barrier frees the waiting threads
1281 """
1282 results1 = self.DummyList()
1283 results2 = self.DummyList()
1284 results3 = self.DummyList()
1285 self.run_threads(self._test_reset_f,
1286 (self.barrier, results1, results2, results3))
1287 self.assertEqual(len(results1), 0)
1288 self.assertEqual(len(results2), self.N-1)
1289 self.assertEqual(len(results3), self.N)
1290
1291 @classmethod
1292 def _test_abort_and_reset_f(cls, barrier, barrier2,
1293 results1, results2, results3):
1294 try:
1295 i = barrier.wait()
1296 if i == cls.N//2:
1297 raise RuntimeError
1298 barrier.wait()
1299 results1.append(True)
1300 except threading.BrokenBarrierError:
1301 results2.append(True)
1302 except RuntimeError:
1303 barrier.abort()
1304 # Synchronize and reset the barrier. Must synchronize first so
1305 # that everyone has left it when we reset, and after so that no
1306 # one enters it before the reset.
1307 if barrier2.wait() == cls.N//2:
1308 barrier.reset()
1309 barrier2.wait()
1310 barrier.wait()
1311 results3.append(True)
1312
1313 def test_abort_and_reset(self):
1314 """
1315 Test that a barrier can be reset after being broken.
1316 """
1317 results1 = self.DummyList()
1318 results2 = self.DummyList()
1319 results3 = self.DummyList()
1320 barrier2 = self.Barrier(self.N)
1321
1322 self.run_threads(self._test_abort_and_reset_f,
1323 (self.barrier, barrier2, results1, results2, results3))
1324 self.assertEqual(len(results1), 0)
1325 self.assertEqual(len(results2), self.N-1)
1326 self.assertEqual(len(results3), self.N)
1327
1328 @classmethod
1329 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001330 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001331 if i == cls.N//2:
1332 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001333 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001334 try:
1335 barrier.wait(0.5)
1336 except threading.BrokenBarrierError:
1337 results.append(True)
1338
1339 def test_timeout(self):
1340 """
1341 Test wait(timeout)
1342 """
1343 results = self.DummyList()
1344 self.run_threads(self._test_timeout_f, (self.barrier, results))
1345 self.assertEqual(len(results), self.barrier.parties)
1346
1347 @classmethod
1348 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001349 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001350 if i == cls.N//2:
1351 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001352 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001353 try:
1354 barrier.wait()
1355 except threading.BrokenBarrierError:
1356 results.append(True)
1357
1358 def test_default_timeout(self):
1359 """
1360 Test the barrier's default timeout
1361 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001362 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001363 results = self.DummyList()
1364 self.run_threads(self._test_default_timeout_f, (barrier, results))
1365 self.assertEqual(len(results), barrier.parties)
1366
1367 def test_single_thread(self):
1368 b = self.Barrier(1)
1369 b.wait()
1370 b.wait()
1371
1372 @classmethod
1373 def _test_thousand_f(cls, barrier, passes, conn, lock):
1374 for i in range(passes):
1375 barrier.wait()
1376 with lock:
1377 conn.send(i)
1378
1379 def test_thousand(self):
1380 if self.TYPE == 'manager':
1381 return
1382 passes = 1000
1383 lock = self.Lock()
1384 conn, child_conn = self.Pipe(False)
1385 for j in range(self.N):
1386 p = self.Process(target=self._test_thousand_f,
1387 args=(self.barrier, passes, child_conn, lock))
1388 p.start()
1389
1390 for i in range(passes):
1391 for j in range(self.N):
1392 self.assertEqual(conn.recv(), i)
1393
1394#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001395#
1396#
1397
1398class _TestValue(BaseTestCase):
1399
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001400 ALLOWED_TYPES = ('processes',)
1401
Benjamin Petersone711caf2008-06-11 16:44:04 +00001402 codes_values = [
1403 ('i', 4343, 24234),
1404 ('d', 3.625, -4.25),
1405 ('h', -232, 234),
1406 ('c', latin('x'), latin('y'))
1407 ]
1408
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001409 def setUp(self):
1410 if not HAS_SHAREDCTYPES:
1411 self.skipTest("requires multiprocessing.sharedctypes")
1412
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001413 @classmethod
1414 def _test(cls, values):
1415 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001416 sv.value = cv[2]
1417
1418
1419 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001420 if raw:
1421 values = [self.RawValue(code, value)
1422 for code, value, _ in self.codes_values]
1423 else:
1424 values = [self.Value(code, value)
1425 for code, value, _ in self.codes_values]
1426
1427 for sv, cv in zip(values, self.codes_values):
1428 self.assertEqual(sv.value, cv[1])
1429
1430 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001431 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001432 proc.start()
1433 proc.join()
1434
1435 for sv, cv in zip(values, self.codes_values):
1436 self.assertEqual(sv.value, cv[2])
1437
1438 def test_rawvalue(self):
1439 self.test_value(raw=True)
1440
1441 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001442 val1 = self.Value('i', 5)
1443 lock1 = val1.get_lock()
1444 obj1 = val1.get_obj()
1445
1446 val2 = self.Value('i', 5, lock=None)
1447 lock2 = val2.get_lock()
1448 obj2 = val2.get_obj()
1449
1450 lock = self.Lock()
1451 val3 = self.Value('i', 5, lock=lock)
1452 lock3 = val3.get_lock()
1453 obj3 = val3.get_obj()
1454 self.assertEqual(lock, lock3)
1455
Jesse Nollerb0516a62009-01-18 03:11:38 +00001456 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001457 self.assertFalse(hasattr(arr4, 'get_lock'))
1458 self.assertFalse(hasattr(arr4, 'get_obj'))
1459
Jesse Nollerb0516a62009-01-18 03:11:38 +00001460 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1461
1462 arr5 = self.RawValue('i', 5)
1463 self.assertFalse(hasattr(arr5, 'get_lock'))
1464 self.assertFalse(hasattr(arr5, 'get_obj'))
1465
Benjamin Petersone711caf2008-06-11 16:44:04 +00001466
1467class _TestArray(BaseTestCase):
1468
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001469 ALLOWED_TYPES = ('processes',)
1470
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001471 @classmethod
1472 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001473 for i in range(1, len(seq)):
1474 seq[i] += seq[i-1]
1475
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001476 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001477 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001478 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1479 if raw:
1480 arr = self.RawArray('i', seq)
1481 else:
1482 arr = self.Array('i', seq)
1483
1484 self.assertEqual(len(arr), len(seq))
1485 self.assertEqual(arr[3], seq[3])
1486 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1487
1488 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1489
1490 self.assertEqual(list(arr[:]), seq)
1491
1492 self.f(seq)
1493
1494 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001495 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001496 p.start()
1497 p.join()
1498
1499 self.assertEqual(list(arr[:]), seq)
1500
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001501 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001502 def test_array_from_size(self):
1503 size = 10
1504 # Test for zeroing (see issue #11675).
1505 # The repetition below strengthens the test by increasing the chances
1506 # of previously allocated non-zero memory being used for the new array
1507 # on the 2nd and 3rd loops.
1508 for _ in range(3):
1509 arr = self.Array('i', size)
1510 self.assertEqual(len(arr), size)
1511 self.assertEqual(list(arr), [0] * size)
1512 arr[:] = range(10)
1513 self.assertEqual(list(arr), list(range(10)))
1514 del arr
1515
1516 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001517 def test_rawarray(self):
1518 self.test_array(raw=True)
1519
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001520 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001521 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001522 arr1 = self.Array('i', list(range(10)))
1523 lock1 = arr1.get_lock()
1524 obj1 = arr1.get_obj()
1525
1526 arr2 = self.Array('i', list(range(10)), lock=None)
1527 lock2 = arr2.get_lock()
1528 obj2 = arr2.get_obj()
1529
1530 lock = self.Lock()
1531 arr3 = self.Array('i', list(range(10)), lock=lock)
1532 lock3 = arr3.get_lock()
1533 obj3 = arr3.get_obj()
1534 self.assertEqual(lock, lock3)
1535
Jesse Nollerb0516a62009-01-18 03:11:38 +00001536 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001537 self.assertFalse(hasattr(arr4, 'get_lock'))
1538 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001539 self.assertRaises(AttributeError,
1540 self.Array, 'i', range(10), lock='notalock')
1541
1542 arr5 = self.RawArray('i', range(10))
1543 self.assertFalse(hasattr(arr5, 'get_lock'))
1544 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001545
1546#
1547#
1548#
1549
1550class _TestContainers(BaseTestCase):
1551
1552 ALLOWED_TYPES = ('manager',)
1553
1554 def test_list(self):
1555 a = self.list(list(range(10)))
1556 self.assertEqual(a[:], list(range(10)))
1557
1558 b = self.list()
1559 self.assertEqual(b[:], [])
1560
1561 b.extend(list(range(5)))
1562 self.assertEqual(b[:], list(range(5)))
1563
1564 self.assertEqual(b[2], 2)
1565 self.assertEqual(b[2:10], [2,3,4])
1566
1567 b *= 2
1568 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1569
1570 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1571
1572 self.assertEqual(a[:], list(range(10)))
1573
1574 d = [a, b]
1575 e = self.list(d)
1576 self.assertEqual(
1577 e[:],
1578 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1579 )
1580
1581 f = self.list([a])
1582 a.append('hello')
1583 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1584
1585 def test_dict(self):
1586 d = self.dict()
1587 indices = list(range(65, 70))
1588 for i in indices:
1589 d[i] = chr(i)
1590 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1591 self.assertEqual(sorted(d.keys()), indices)
1592 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1593 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1594
1595 def test_namespace(self):
1596 n = self.Namespace()
1597 n.name = 'Bob'
1598 n.job = 'Builder'
1599 n._hidden = 'hidden'
1600 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1601 del n.job
1602 self.assertEqual(str(n), "Namespace(name='Bob')")
1603 self.assertTrue(hasattr(n, 'name'))
1604 self.assertTrue(not hasattr(n, 'job'))
1605
1606#
1607#
1608#
1609
1610def sqr(x, wait=0.0):
1611 time.sleep(wait)
1612 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001613
Antoine Pitroude911b22011-12-21 11:03:24 +01001614def mul(x, y):
1615 return x*y
1616
Benjamin Petersone711caf2008-06-11 16:44:04 +00001617class _TestPool(BaseTestCase):
1618
1619 def test_apply(self):
1620 papply = self.pool.apply
1621 self.assertEqual(papply(sqr, (5,)), sqr(5))
1622 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1623
1624 def test_map(self):
1625 pmap = self.pool.map
1626 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1627 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1628 list(map(sqr, list(range(100)))))
1629
Antoine Pitroude911b22011-12-21 11:03:24 +01001630 def test_starmap(self):
1631 psmap = self.pool.starmap
1632 tuples = list(zip(range(10), range(9,-1, -1)))
1633 self.assertEqual(psmap(mul, tuples),
1634 list(itertools.starmap(mul, tuples)))
1635 tuples = list(zip(range(100), range(99,-1, -1)))
1636 self.assertEqual(psmap(mul, tuples, chunksize=20),
1637 list(itertools.starmap(mul, tuples)))
1638
1639 def test_starmap_async(self):
1640 tuples = list(zip(range(100), range(99,-1, -1)))
1641 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1642 list(itertools.starmap(mul, tuples)))
1643
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001644 def test_map_chunksize(self):
1645 try:
1646 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1647 except multiprocessing.TimeoutError:
1648 self.fail("pool.map_async with chunksize stalled on null list")
1649
Benjamin Petersone711caf2008-06-11 16:44:04 +00001650 def test_async(self):
1651 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1652 get = TimingWrapper(res.get)
1653 self.assertEqual(get(), 49)
1654 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1655
1656 def test_async_timeout(self):
1657 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1658 get = TimingWrapper(res.get)
1659 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1660 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1661
1662 def test_imap(self):
1663 it = self.pool.imap(sqr, list(range(10)))
1664 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1665
1666 it = self.pool.imap(sqr, list(range(10)))
1667 for i in range(10):
1668 self.assertEqual(next(it), i*i)
1669 self.assertRaises(StopIteration, it.__next__)
1670
1671 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1672 for i in range(1000):
1673 self.assertEqual(next(it), i*i)
1674 self.assertRaises(StopIteration, it.__next__)
1675
1676 def test_imap_unordered(self):
1677 it = self.pool.imap_unordered(sqr, list(range(1000)))
1678 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1679
1680 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1681 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1682
1683 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001684 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1685 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1686
Benjamin Petersone711caf2008-06-11 16:44:04 +00001687 p = multiprocessing.Pool(3)
1688 self.assertEqual(3, len(p._pool))
1689 p.close()
1690 p.join()
1691
1692 def test_terminate(self):
1693 if self.TYPE == 'manager':
1694 # On Unix a forked process increfs each shared object to
1695 # which its parent process held a reference. If the
1696 # forked process gets terminated then there is likely to
1697 # be a reference leak. So to prevent
1698 # _TestZZZNumberOfObjects from failing we skip this test
1699 # when using a manager.
1700 return
1701
1702 result = self.pool.map_async(
1703 time.sleep, [0.1 for i in range(10000)], chunksize=1
1704 )
1705 self.pool.terminate()
1706 join = TimingWrapper(self.pool.join)
1707 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001708 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001709
Richard Oudkerke41682b2012-06-06 19:04:57 +01001710 def test_empty_iterable(self):
1711 # See Issue 12157
1712 p = self.Pool(1)
1713
1714 self.assertEqual(p.map(sqr, []), [])
1715 self.assertEqual(list(p.imap(sqr, [])), [])
1716 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1717 self.assertEqual(p.map_async(sqr, []).get(), [])
1718
1719 p.close()
1720 p.join()
1721
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001722 def test_context(self):
1723 if self.TYPE == 'processes':
1724 L = list(range(10))
1725 expected = [sqr(i) for i in L]
1726 with multiprocessing.Pool(2) as p:
1727 r = p.map_async(sqr, L)
1728 self.assertEqual(r.get(), expected)
1729 self.assertRaises(AssertionError, p.map_async, sqr, L)
1730
Ask Solem2afcbf22010-11-09 20:55:52 +00001731def raising():
1732 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001733
Ask Solem2afcbf22010-11-09 20:55:52 +00001734def unpickleable_result():
1735 return lambda: 42
1736
1737class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001738 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001739
1740 def test_async_error_callback(self):
1741 p = multiprocessing.Pool(2)
1742
1743 scratchpad = [None]
1744 def errback(exc):
1745 scratchpad[0] = exc
1746
1747 res = p.apply_async(raising, error_callback=errback)
1748 self.assertRaises(KeyError, res.get)
1749 self.assertTrue(scratchpad[0])
1750 self.assertIsInstance(scratchpad[0], KeyError)
1751
1752 p.close()
1753 p.join()
1754
1755 def test_unpickleable_result(self):
1756 from multiprocessing.pool import MaybeEncodingError
1757 p = multiprocessing.Pool(2)
1758
1759 # Make sure we don't lose pool processes because of encoding errors.
1760 for iteration in range(20):
1761
1762 scratchpad = [None]
1763 def errback(exc):
1764 scratchpad[0] = exc
1765
1766 res = p.apply_async(unpickleable_result, error_callback=errback)
1767 self.assertRaises(MaybeEncodingError, res.get)
1768 wrapped = scratchpad[0]
1769 self.assertTrue(wrapped)
1770 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1771 self.assertIsNotNone(wrapped.exc)
1772 self.assertIsNotNone(wrapped.value)
1773
1774 p.close()
1775 p.join()
1776
1777class _TestPoolWorkerLifetime(BaseTestCase):
1778 ALLOWED_TYPES = ('processes', )
1779
Jesse Noller1f0b6582010-01-27 03:36:01 +00001780 def test_pool_worker_lifetime(self):
1781 p = multiprocessing.Pool(3, maxtasksperchild=10)
1782 self.assertEqual(3, len(p._pool))
1783 origworkerpids = [w.pid for w in p._pool]
1784 # Run many tasks so each worker gets replaced (hopefully)
1785 results = []
1786 for i in range(100):
1787 results.append(p.apply_async(sqr, (i, )))
1788 # Fetch the results and verify we got the right answers,
1789 # also ensuring all the tasks have completed.
1790 for (j, res) in enumerate(results):
1791 self.assertEqual(res.get(), sqr(j))
1792 # Refill the pool
1793 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001794 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001795 # (countdown * DELTA = 5 seconds max startup process time)
1796 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001797 while countdown and not all(w.is_alive() for w in p._pool):
1798 countdown -= 1
1799 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001800 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001801 # All pids should be assigned. See issue #7805.
1802 self.assertNotIn(None, origworkerpids)
1803 self.assertNotIn(None, finalworkerpids)
1804 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001805 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1806 p.close()
1807 p.join()
1808
Charles-François Natalif8859e12011-10-24 18:45:29 +02001809 def test_pool_worker_lifetime_early_close(self):
1810 # Issue #10332: closing a pool whose workers have limited lifetimes
1811 # before all the tasks completed would make join() hang.
1812 p = multiprocessing.Pool(3, maxtasksperchild=1)
1813 results = []
1814 for i in range(6):
1815 results.append(p.apply_async(sqr, (i, 0.3)))
1816 p.close()
1817 p.join()
1818 # check the results
1819 for (j, res) in enumerate(results):
1820 self.assertEqual(res.get(), sqr(j))
1821
1822
Benjamin Petersone711caf2008-06-11 16:44:04 +00001823#
1824# Test that manager has expected number of shared objects left
1825#
1826
1827class _TestZZZNumberOfObjects(BaseTestCase):
1828 # Because test cases are sorted alphabetically, this one will get
1829 # run after all the other tests for the manager. It tests that
1830 # there have been no "reference leaks" for the manager's shared
1831 # objects. Note the comment in _TestPool.test_terminate().
Richard Oudkerk3049f122012-06-15 20:08:29 +01001832
1833 # If some other test using ManagerMixin.manager fails, then the
1834 # raised exception may keep alive a frame which holds a reference
1835 # to a managed object. This will cause test_number_of_objects to
1836 # also fail.
Benjamin Petersone711caf2008-06-11 16:44:04 +00001837 ALLOWED_TYPES = ('manager',)
1838
1839 def test_number_of_objects(self):
1840 EXPECTED_NUMBER = 1 # the pool object is still alive
1841 multiprocessing.active_children() # discard dead process objs
1842 gc.collect() # do garbage collection
1843 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001844 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001845 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001846 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001847 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001848
1849 self.assertEqual(refs, EXPECTED_NUMBER)
1850
1851#
1852# Test of creating a customized manager class
1853#
1854
1855from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1856
1857class FooBar(object):
1858 def f(self):
1859 return 'f()'
1860 def g(self):
1861 raise ValueError
1862 def _h(self):
1863 return '_h()'
1864
1865def baz():
1866 for i in range(10):
1867 yield i*i
1868
1869class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001870 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001871 def __iter__(self):
1872 return self
1873 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001874 return self._callmethod('__next__')
1875
1876class MyManager(BaseManager):
1877 pass
1878
1879MyManager.register('Foo', callable=FooBar)
1880MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1881MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1882
1883
1884class _TestMyManager(BaseTestCase):
1885
1886 ALLOWED_TYPES = ('manager',)
1887
1888 def test_mymanager(self):
1889 manager = MyManager()
1890 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001891 self.common(manager)
1892 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001893
Richard Oudkerkac385712012-06-18 21:29:30 +01001894 # If the manager process exited cleanly then the exitcode
1895 # will be zero. Otherwise (after a short timeout)
1896 # terminate() is used, resulting in an exitcode of -SIGTERM.
1897 self.assertEqual(manager._process.exitcode, 0)
1898
1899 def test_mymanager_context(self):
1900 with MyManager() as manager:
1901 self.common(manager)
1902 self.assertEqual(manager._process.exitcode, 0)
1903
1904 def test_mymanager_context_prestarted(self):
1905 manager = MyManager()
1906 manager.start()
1907 with manager:
1908 self.common(manager)
1909 self.assertEqual(manager._process.exitcode, 0)
1910
1911 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001912 foo = manager.Foo()
1913 bar = manager.Bar()
1914 baz = manager.baz()
1915
1916 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1917 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1918
1919 self.assertEqual(foo_methods, ['f', 'g'])
1920 self.assertEqual(bar_methods, ['f', '_h'])
1921
1922 self.assertEqual(foo.f(), 'f()')
1923 self.assertRaises(ValueError, foo.g)
1924 self.assertEqual(foo._callmethod('f'), 'f()')
1925 self.assertRaises(RemoteError, foo._callmethod, '_h')
1926
1927 self.assertEqual(bar.f(), 'f()')
1928 self.assertEqual(bar._h(), '_h()')
1929 self.assertEqual(bar._callmethod('f'), 'f()')
1930 self.assertEqual(bar._callmethod('_h'), '_h()')
1931
1932 self.assertEqual(list(baz), [i*i for i in range(10)])
1933
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001934
Benjamin Petersone711caf2008-06-11 16:44:04 +00001935#
1936# Test of connecting to a remote server and using xmlrpclib for serialization
1937#
1938
1939_queue = pyqueue.Queue()
1940def get_queue():
1941 return _queue
1942
1943class QueueManager(BaseManager):
1944 '''manager class used by server process'''
1945QueueManager.register('get_queue', callable=get_queue)
1946
1947class QueueManager2(BaseManager):
1948 '''manager class which specifies the same interface as QueueManager'''
1949QueueManager2.register('get_queue')
1950
1951
1952SERIALIZER = 'xmlrpclib'
1953
1954class _TestRemoteManager(BaseTestCase):
1955
1956 ALLOWED_TYPES = ('manager',)
1957
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001958 @classmethod
1959 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001960 manager = QueueManager2(
1961 address=address, authkey=authkey, serializer=SERIALIZER
1962 )
1963 manager.connect()
1964 queue = manager.get_queue()
1965 queue.put(('hello world', None, True, 2.25))
1966
1967 def test_remote(self):
1968 authkey = os.urandom(32)
1969
1970 manager = QueueManager(
1971 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1972 )
1973 manager.start()
1974
1975 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001976 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001977 p.start()
1978
1979 manager2 = QueueManager2(
1980 address=manager.address, authkey=authkey, serializer=SERIALIZER
1981 )
1982 manager2.connect()
1983 queue = manager2.get_queue()
1984
1985 # Note that xmlrpclib will deserialize object as a list not a tuple
1986 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1987
1988 # Because we are using xmlrpclib for serialization instead of
1989 # pickle this will cause a serialization error.
1990 self.assertRaises(Exception, queue.put, time.sleep)
1991
1992 # Make queue finalizer run before the server is stopped
1993 del queue
1994 manager.shutdown()
1995
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001996class _TestManagerRestart(BaseTestCase):
1997
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001998 @classmethod
1999 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002000 manager = QueueManager(
2001 address=address, authkey=authkey, serializer=SERIALIZER)
2002 manager.connect()
2003 queue = manager.get_queue()
2004 queue.put('hello world')
2005
2006 def test_rapid_restart(self):
2007 authkey = os.urandom(32)
2008 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002009 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002010 srvr = manager.get_server()
2011 addr = srvr.address
2012 # Close the connection.Listener socket which gets opened as a part
2013 # of manager.get_server(). It's not needed for the test.
2014 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002015 manager.start()
2016
2017 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002018 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002019 p.start()
2020 queue = manager.get_queue()
2021 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002022 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002023 manager.shutdown()
2024 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002025 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002026 try:
2027 manager.start()
2028 except IOError as e:
2029 if e.errno != errno.EADDRINUSE:
2030 raise
2031 # Retry after some time, in case the old socket was lingering
2032 # (sporadic failure on buildbots)
2033 time.sleep(1.0)
2034 manager = QueueManager(
2035 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002036 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002037
Benjamin Petersone711caf2008-06-11 16:44:04 +00002038#
2039#
2040#
2041
2042SENTINEL = latin('')
2043
2044class _TestConnection(BaseTestCase):
2045
2046 ALLOWED_TYPES = ('processes', 'threads')
2047
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002048 @classmethod
2049 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002050 for msg in iter(conn.recv_bytes, SENTINEL):
2051 conn.send_bytes(msg)
2052 conn.close()
2053
2054 def test_connection(self):
2055 conn, child_conn = self.Pipe()
2056
2057 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002058 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002059 p.start()
2060
2061 seq = [1, 2.25, None]
2062 msg = latin('hello world')
2063 longmsg = msg * 10
2064 arr = array.array('i', list(range(4)))
2065
2066 if self.TYPE == 'processes':
2067 self.assertEqual(type(conn.fileno()), int)
2068
2069 self.assertEqual(conn.send(seq), None)
2070 self.assertEqual(conn.recv(), seq)
2071
2072 self.assertEqual(conn.send_bytes(msg), None)
2073 self.assertEqual(conn.recv_bytes(), msg)
2074
2075 if self.TYPE == 'processes':
2076 buffer = array.array('i', [0]*10)
2077 expected = list(arr) + [0] * (10 - len(arr))
2078 self.assertEqual(conn.send_bytes(arr), None)
2079 self.assertEqual(conn.recv_bytes_into(buffer),
2080 len(arr) * buffer.itemsize)
2081 self.assertEqual(list(buffer), expected)
2082
2083 buffer = array.array('i', [0]*10)
2084 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2085 self.assertEqual(conn.send_bytes(arr), None)
2086 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2087 len(arr) * buffer.itemsize)
2088 self.assertEqual(list(buffer), expected)
2089
2090 buffer = bytearray(latin(' ' * 40))
2091 self.assertEqual(conn.send_bytes(longmsg), None)
2092 try:
2093 res = conn.recv_bytes_into(buffer)
2094 except multiprocessing.BufferTooShort as e:
2095 self.assertEqual(e.args, (longmsg,))
2096 else:
2097 self.fail('expected BufferTooShort, got %s' % res)
2098
2099 poll = TimingWrapper(conn.poll)
2100
2101 self.assertEqual(poll(), False)
2102 self.assertTimingAlmostEqual(poll.elapsed, 0)
2103
Richard Oudkerk59d54042012-05-10 16:11:12 +01002104 self.assertEqual(poll(-1), False)
2105 self.assertTimingAlmostEqual(poll.elapsed, 0)
2106
Benjamin Petersone711caf2008-06-11 16:44:04 +00002107 self.assertEqual(poll(TIMEOUT1), False)
2108 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2109
2110 conn.send(None)
2111
2112 self.assertEqual(poll(TIMEOUT1), True)
2113 self.assertTimingAlmostEqual(poll.elapsed, 0)
2114
2115 self.assertEqual(conn.recv(), None)
2116
2117 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2118 conn.send_bytes(really_big_msg)
2119 self.assertEqual(conn.recv_bytes(), really_big_msg)
2120
2121 conn.send_bytes(SENTINEL) # tell child to quit
2122 child_conn.close()
2123
2124 if self.TYPE == 'processes':
2125 self.assertEqual(conn.readable, True)
2126 self.assertEqual(conn.writable, True)
2127 self.assertRaises(EOFError, conn.recv)
2128 self.assertRaises(EOFError, conn.recv_bytes)
2129
2130 p.join()
2131
2132 def test_duplex_false(self):
2133 reader, writer = self.Pipe(duplex=False)
2134 self.assertEqual(writer.send(1), None)
2135 self.assertEqual(reader.recv(), 1)
2136 if self.TYPE == 'processes':
2137 self.assertEqual(reader.readable, True)
2138 self.assertEqual(reader.writable, False)
2139 self.assertEqual(writer.readable, False)
2140 self.assertEqual(writer.writable, True)
2141 self.assertRaises(IOError, reader.send, 2)
2142 self.assertRaises(IOError, writer.recv)
2143 self.assertRaises(IOError, writer.poll)
2144
2145 def test_spawn_close(self):
2146 # We test that a pipe connection can be closed by parent
2147 # process immediately after child is spawned. On Windows this
2148 # would have sometimes failed on old versions because
2149 # child_conn would be closed before the child got a chance to
2150 # duplicate it.
2151 conn, child_conn = self.Pipe()
2152
2153 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002154 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002155 p.start()
2156 child_conn.close() # this might complete before child initializes
2157
2158 msg = latin('hello')
2159 conn.send_bytes(msg)
2160 self.assertEqual(conn.recv_bytes(), msg)
2161
2162 conn.send_bytes(SENTINEL)
2163 conn.close()
2164 p.join()
2165
2166 def test_sendbytes(self):
2167 if self.TYPE != 'processes':
2168 return
2169
2170 msg = latin('abcdefghijklmnopqrstuvwxyz')
2171 a, b = self.Pipe()
2172
2173 a.send_bytes(msg)
2174 self.assertEqual(b.recv_bytes(), msg)
2175
2176 a.send_bytes(msg, 5)
2177 self.assertEqual(b.recv_bytes(), msg[5:])
2178
2179 a.send_bytes(msg, 7, 8)
2180 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2181
2182 a.send_bytes(msg, 26)
2183 self.assertEqual(b.recv_bytes(), latin(''))
2184
2185 a.send_bytes(msg, 26, 0)
2186 self.assertEqual(b.recv_bytes(), latin(''))
2187
2188 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2189
2190 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2191
2192 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2193
2194 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2195
2196 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2197
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002198 @classmethod
2199 def _is_fd_assigned(cls, fd):
2200 try:
2201 os.fstat(fd)
2202 except OSError as e:
2203 if e.errno == errno.EBADF:
2204 return False
2205 raise
2206 else:
2207 return True
2208
2209 @classmethod
2210 def _writefd(cls, conn, data, create_dummy_fds=False):
2211 if create_dummy_fds:
2212 for i in range(0, 256):
2213 if not cls._is_fd_assigned(i):
2214 os.dup2(conn.fileno(), i)
2215 fd = reduction.recv_handle(conn)
2216 if msvcrt:
2217 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2218 os.write(fd, data)
2219 os.close(fd)
2220
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002221 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002222 def test_fd_transfer(self):
2223 if self.TYPE != 'processes':
2224 self.skipTest("only makes sense with processes")
2225 conn, child_conn = self.Pipe(duplex=True)
2226
2227 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002228 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002229 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002230 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002231 with open(test.support.TESTFN, "wb") as f:
2232 fd = f.fileno()
2233 if msvcrt:
2234 fd = msvcrt.get_osfhandle(fd)
2235 reduction.send_handle(conn, fd, p.pid)
2236 p.join()
2237 with open(test.support.TESTFN, "rb") as f:
2238 self.assertEqual(f.read(), b"foo")
2239
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002240 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002241 @unittest.skipIf(sys.platform == "win32",
2242 "test semantics don't make sense on Windows")
2243 @unittest.skipIf(MAXFD <= 256,
2244 "largest assignable fd number is too small")
2245 @unittest.skipUnless(hasattr(os, "dup2"),
2246 "test needs os.dup2()")
2247 def test_large_fd_transfer(self):
2248 # With fd > 256 (issue #11657)
2249 if self.TYPE != 'processes':
2250 self.skipTest("only makes sense with processes")
2251 conn, child_conn = self.Pipe(duplex=True)
2252
2253 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002254 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002255 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002256 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002257 with open(test.support.TESTFN, "wb") as f:
2258 fd = f.fileno()
2259 for newfd in range(256, MAXFD):
2260 if not self._is_fd_assigned(newfd):
2261 break
2262 else:
2263 self.fail("could not find an unassigned large file descriptor")
2264 os.dup2(fd, newfd)
2265 try:
2266 reduction.send_handle(conn, newfd, p.pid)
2267 finally:
2268 os.close(newfd)
2269 p.join()
2270 with open(test.support.TESTFN, "rb") as f:
2271 self.assertEqual(f.read(), b"bar")
2272
Jesus Cea4507e642011-09-21 03:53:25 +02002273 @classmethod
2274 def _send_data_without_fd(self, conn):
2275 os.write(conn.fileno(), b"\0")
2276
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002277 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002278 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2279 def test_missing_fd_transfer(self):
2280 # Check that exception is raised when received data is not
2281 # accompanied by a file descriptor in ancillary data.
2282 if self.TYPE != 'processes':
2283 self.skipTest("only makes sense with processes")
2284 conn, child_conn = self.Pipe(duplex=True)
2285
2286 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2287 p.daemon = True
2288 p.start()
2289 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2290 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002291
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002292 def test_context(self):
2293 a, b = self.Pipe()
2294
2295 with a, b:
2296 a.send(1729)
2297 self.assertEqual(b.recv(), 1729)
2298 if self.TYPE == 'processes':
2299 self.assertFalse(a.closed)
2300 self.assertFalse(b.closed)
2301
2302 if self.TYPE == 'processes':
2303 self.assertTrue(a.closed)
2304 self.assertTrue(b.closed)
2305 self.assertRaises(IOError, a.recv)
2306 self.assertRaises(IOError, b.recv)
2307
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002308class _TestListener(BaseTestCase):
2309
Richard Oudkerk91257752012-06-15 21:53:34 +01002310 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002311
2312 def test_multiple_bind(self):
2313 for family in self.connection.families:
2314 l = self.connection.Listener(family=family)
2315 self.addCleanup(l.close)
2316 self.assertRaises(OSError, self.connection.Listener,
2317 l.address, family)
2318
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002319 def test_context(self):
2320 with self.connection.Listener() as l:
2321 with self.connection.Client(l.address) as c:
2322 with l.accept() as d:
2323 c.send(1729)
2324 self.assertEqual(d.recv(), 1729)
2325
2326 if self.TYPE == 'processes':
2327 self.assertRaises(IOError, l.accept)
2328
Benjamin Petersone711caf2008-06-11 16:44:04 +00002329class _TestListenerClient(BaseTestCase):
2330
2331 ALLOWED_TYPES = ('processes', 'threads')
2332
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002333 @classmethod
2334 def _test(cls, address):
2335 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002336 conn.send('hello')
2337 conn.close()
2338
2339 def test_listener_client(self):
2340 for family in self.connection.families:
2341 l = self.connection.Listener(family=family)
2342 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002343 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002344 p.start()
2345 conn = l.accept()
2346 self.assertEqual(conn.recv(), 'hello')
2347 p.join()
2348 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002349
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002350 def test_issue14725(self):
2351 l = self.connection.Listener()
2352 p = self.Process(target=self._test, args=(l.address,))
2353 p.daemon = True
2354 p.start()
2355 time.sleep(1)
2356 # On Windows the client process should by now have connected,
2357 # written data and closed the pipe handle by now. This causes
2358 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2359 # 14725.
2360 conn = l.accept()
2361 self.assertEqual(conn.recv(), 'hello')
2362 conn.close()
2363 p.join()
2364 l.close()
2365
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002366class _TestPoll(unittest.TestCase):
2367
2368 ALLOWED_TYPES = ('processes', 'threads')
2369
2370 def test_empty_string(self):
2371 a, b = self.Pipe()
2372 self.assertEqual(a.poll(), False)
2373 b.send_bytes(b'')
2374 self.assertEqual(a.poll(), True)
2375 self.assertEqual(a.poll(), True)
2376
2377 @classmethod
2378 def _child_strings(cls, conn, strings):
2379 for s in strings:
2380 time.sleep(0.1)
2381 conn.send_bytes(s)
2382 conn.close()
2383
2384 def test_strings(self):
2385 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2386 a, b = self.Pipe()
2387 p = self.Process(target=self._child_strings, args=(b, strings))
2388 p.start()
2389
2390 for s in strings:
2391 for i in range(200):
2392 if a.poll(0.01):
2393 break
2394 x = a.recv_bytes()
2395 self.assertEqual(s, x)
2396
2397 p.join()
2398
2399 @classmethod
2400 def _child_boundaries(cls, r):
2401 # Polling may "pull" a message in to the child process, but we
2402 # don't want it to pull only part of a message, as that would
2403 # corrupt the pipe for any other processes which might later
2404 # read from it.
2405 r.poll(5)
2406
2407 def test_boundaries(self):
2408 r, w = self.Pipe(False)
2409 p = self.Process(target=self._child_boundaries, args=(r,))
2410 p.start()
2411 time.sleep(2)
2412 L = [b"first", b"second"]
2413 for obj in L:
2414 w.send_bytes(obj)
2415 w.close()
2416 p.join()
2417 self.assertIn(r.recv_bytes(), L)
2418
2419 @classmethod
2420 def _child_dont_merge(cls, b):
2421 b.send_bytes(b'a')
2422 b.send_bytes(b'b')
2423 b.send_bytes(b'cd')
2424
2425 def test_dont_merge(self):
2426 a, b = self.Pipe()
2427 self.assertEqual(a.poll(0.0), False)
2428 self.assertEqual(a.poll(0.1), False)
2429
2430 p = self.Process(target=self._child_dont_merge, args=(b,))
2431 p.start()
2432
2433 self.assertEqual(a.recv_bytes(), b'a')
2434 self.assertEqual(a.poll(1.0), True)
2435 self.assertEqual(a.poll(1.0), True)
2436 self.assertEqual(a.recv_bytes(), b'b')
2437 self.assertEqual(a.poll(1.0), True)
2438 self.assertEqual(a.poll(1.0), True)
2439 self.assertEqual(a.poll(0.0), True)
2440 self.assertEqual(a.recv_bytes(), b'cd')
2441
2442 p.join()
2443
Benjamin Petersone711caf2008-06-11 16:44:04 +00002444#
2445# Test of sending connection and socket objects between processes
2446#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002447
Richard Oudkerk24524192012-04-30 14:48:51 +01002448# Intermittent fails on Mac OS X -- see Issue14669 and Issue12958
2449@unittest.skipIf(sys.platform == "darwin", "fd passing unreliable on Mac OS X")
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002450@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002451class _TestPicklingConnections(BaseTestCase):
2452
2453 ALLOWED_TYPES = ('processes',)
2454
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002455 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002456 def tearDownClass(cls):
2457 from multiprocessing.reduction import resource_sharer
2458 resource_sharer.stop(timeout=5)
2459
2460 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002461 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002462 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002463 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002464 conn.send(l.address)
2465 new_conn = l.accept()
2466 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002467 new_conn.close()
2468 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002469
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002470 l = socket.socket()
2471 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002472 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002473 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002474 new_conn, addr = l.accept()
2475 conn.send(new_conn)
2476 new_conn.close()
2477 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002478
2479 conn.recv()
2480
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002481 @classmethod
2482 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002483 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002484 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002485 client.send(msg.upper())
2486 client.close()
2487
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002488 address, msg = conn.recv()
2489 client = socket.socket()
2490 client.connect(address)
2491 client.sendall(msg.upper())
2492 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002493
2494 conn.close()
2495
2496 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002497 families = self.connection.families
2498
2499 lconn, lconn0 = self.Pipe()
2500 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002501 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002502 lp.start()
2503 lconn0.close()
2504
2505 rconn, rconn0 = self.Pipe()
2506 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002507 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002508 rp.start()
2509 rconn0.close()
2510
2511 for fam in families:
2512 msg = ('This connection uses family %s' % fam).encode('ascii')
2513 address = lconn.recv()
2514 rconn.send((address, msg))
2515 new_conn = lconn.recv()
2516 self.assertEqual(new_conn.recv(), msg.upper())
2517
2518 rconn.send(None)
2519
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002520 msg = latin('This connection uses a normal socket')
2521 address = lconn.recv()
2522 rconn.send((address, msg))
2523 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002524 buf = []
2525 while True:
2526 s = new_conn.recv(100)
2527 if not s:
2528 break
2529 buf.append(s)
2530 buf = b''.join(buf)
2531 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002532 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002533
2534 lconn.send(None)
2535
2536 rconn.close()
2537 lconn.close()
2538
2539 lp.join()
2540 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002541
2542 @classmethod
2543 def child_access(cls, conn):
2544 w = conn.recv()
2545 w.send('all is well')
2546 w.close()
2547
2548 r = conn.recv()
2549 msg = r.recv()
2550 conn.send(msg*2)
2551
2552 conn.close()
2553
2554 def test_access(self):
2555 # On Windows, if we do not specify a destination pid when
2556 # using DupHandle then we need to be careful to use the
2557 # correct access flags for DuplicateHandle(), or else
2558 # DupHandle.detach() will raise PermissionError. For example,
2559 # for a read only pipe handle we should use
2560 # access=FILE_GENERIC_READ. (Unfortunately
2561 # DUPLICATE_SAME_ACCESS does not work.)
2562 conn, child_conn = self.Pipe()
2563 p = self.Process(target=self.child_access, args=(child_conn,))
2564 p.daemon = True
2565 p.start()
2566 child_conn.close()
2567
2568 r, w = self.Pipe(duplex=False)
2569 conn.send(w)
2570 w.close()
2571 self.assertEqual(r.recv(), 'all is well')
2572 r.close()
2573
2574 r, w = self.Pipe(duplex=False)
2575 conn.send(r)
2576 r.close()
2577 w.send('foobar')
2578 w.close()
2579 self.assertEqual(conn.recv(), 'foobar'*2)
2580
Benjamin Petersone711caf2008-06-11 16:44:04 +00002581#
2582#
2583#
2584
2585class _TestHeap(BaseTestCase):
2586
2587 ALLOWED_TYPES = ('processes',)
2588
2589 def test_heap(self):
2590 iterations = 5000
2591 maxblocks = 50
2592 blocks = []
2593
2594 # create and destroy lots of blocks of different sizes
2595 for i in range(iterations):
2596 size = int(random.lognormvariate(0, 1) * 1000)
2597 b = multiprocessing.heap.BufferWrapper(size)
2598 blocks.append(b)
2599 if len(blocks) > maxblocks:
2600 i = random.randrange(maxblocks)
2601 del blocks[i]
2602
2603 # get the heap object
2604 heap = multiprocessing.heap.BufferWrapper._heap
2605
2606 # verify the state of the heap
2607 all = []
2608 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002609 heap._lock.acquire()
2610 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002611 for L in list(heap._len_to_seq.values()):
2612 for arena, start, stop in L:
2613 all.append((heap._arenas.index(arena), start, stop,
2614 stop-start, 'free'))
2615 for arena, start, stop in heap._allocated_blocks:
2616 all.append((heap._arenas.index(arena), start, stop,
2617 stop-start, 'occupied'))
2618 occupied += (stop-start)
2619
2620 all.sort()
2621
2622 for i in range(len(all)-1):
2623 (arena, start, stop) = all[i][:3]
2624 (narena, nstart, nstop) = all[i+1][:3]
2625 self.assertTrue((arena != narena and nstart == 0) or
2626 (stop == nstart))
2627
Charles-François Natali778db492011-07-02 14:35:49 +02002628 def test_free_from_gc(self):
2629 # Check that freeing of blocks by the garbage collector doesn't deadlock
2630 # (issue #12352).
2631 # Make sure the GC is enabled, and set lower collection thresholds to
2632 # make collections more frequent (and increase the probability of
2633 # deadlock).
2634 if not gc.isenabled():
2635 gc.enable()
2636 self.addCleanup(gc.disable)
2637 thresholds = gc.get_threshold()
2638 self.addCleanup(gc.set_threshold, *thresholds)
2639 gc.set_threshold(10)
2640
2641 # perform numerous block allocations, with cyclic references to make
2642 # sure objects are collected asynchronously by the gc
2643 for i in range(5000):
2644 a = multiprocessing.heap.BufferWrapper(1)
2645 b = multiprocessing.heap.BufferWrapper(1)
2646 # circular references
2647 a.buddy = b
2648 b.buddy = a
2649
Benjamin Petersone711caf2008-06-11 16:44:04 +00002650#
2651#
2652#
2653
Benjamin Petersone711caf2008-06-11 16:44:04 +00002654class _Foo(Structure):
2655 _fields_ = [
2656 ('x', c_int),
2657 ('y', c_double)
2658 ]
2659
2660class _TestSharedCTypes(BaseTestCase):
2661
2662 ALLOWED_TYPES = ('processes',)
2663
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002664 def setUp(self):
2665 if not HAS_SHAREDCTYPES:
2666 self.skipTest("requires multiprocessing.sharedctypes")
2667
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002668 @classmethod
2669 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002670 x.value *= 2
2671 y.value *= 2
2672 foo.x *= 2
2673 foo.y *= 2
2674 string.value *= 2
2675 for i in range(len(arr)):
2676 arr[i] *= 2
2677
2678 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002679 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002680 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002681 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002682 arr = self.Array('d', list(range(10)), lock=lock)
2683 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002684 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002685
2686 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002687 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002688 p.start()
2689 p.join()
2690
2691 self.assertEqual(x.value, 14)
2692 self.assertAlmostEqual(y.value, 2.0/3.0)
2693 self.assertEqual(foo.x, 6)
2694 self.assertAlmostEqual(foo.y, 4.0)
2695 for i in range(10):
2696 self.assertAlmostEqual(arr[i], i*2)
2697 self.assertEqual(string.value, latin('hellohello'))
2698
2699 def test_synchronize(self):
2700 self.test_sharedctypes(lock=True)
2701
2702 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002703 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002704 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002705 foo.x = 0
2706 foo.y = 0
2707 self.assertEqual(bar.x, 2)
2708 self.assertAlmostEqual(bar.y, 5.0)
2709
2710#
2711#
2712#
2713
2714class _TestFinalize(BaseTestCase):
2715
2716 ALLOWED_TYPES = ('processes',)
2717
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002718 @classmethod
2719 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002720 class Foo(object):
2721 pass
2722
2723 a = Foo()
2724 util.Finalize(a, conn.send, args=('a',))
2725 del a # triggers callback for a
2726
2727 b = Foo()
2728 close_b = util.Finalize(b, conn.send, args=('b',))
2729 close_b() # triggers callback for b
2730 close_b() # does nothing because callback has already been called
2731 del b # does nothing because callback has already been called
2732
2733 c = Foo()
2734 util.Finalize(c, conn.send, args=('c',))
2735
2736 d10 = Foo()
2737 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2738
2739 d01 = Foo()
2740 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2741 d02 = Foo()
2742 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2743 d03 = Foo()
2744 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2745
2746 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2747
2748 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2749
Ezio Melotti13925002011-03-16 11:05:33 +02002750 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002751 # garbage collecting locals
2752 util._exit_function()
2753 conn.close()
2754 os._exit(0)
2755
2756 def test_finalize(self):
2757 conn, child_conn = self.Pipe()
2758
2759 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002760 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002761 p.start()
2762 p.join()
2763
2764 result = [obj for obj in iter(conn.recv, 'STOP')]
2765 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2766
2767#
2768# Test that from ... import * works for each module
2769#
2770
2771class _TestImportStar(BaseTestCase):
2772
2773 ALLOWED_TYPES = ('processes',)
2774
2775 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002776 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002777 'multiprocessing', 'multiprocessing.connection',
2778 'multiprocessing.heap', 'multiprocessing.managers',
2779 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002780 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002781 ]
2782
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002783 if HAS_REDUCTION:
2784 modules.append('multiprocessing.reduction')
2785
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002786 if c_int is not None:
2787 # This module requires _ctypes
2788 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002789
2790 for name in modules:
2791 __import__(name)
2792 mod = sys.modules[name]
2793
2794 for attr in getattr(mod, '__all__', ()):
2795 self.assertTrue(
2796 hasattr(mod, attr),
2797 '%r does not have attribute %r' % (mod, attr)
2798 )
2799
2800#
2801# Quick test that logging works -- does not test logging output
2802#
2803
2804class _TestLogging(BaseTestCase):
2805
2806 ALLOWED_TYPES = ('processes',)
2807
2808 def test_enable_logging(self):
2809 logger = multiprocessing.get_logger()
2810 logger.setLevel(util.SUBWARNING)
2811 self.assertTrue(logger is not None)
2812 logger.debug('this will not be printed')
2813 logger.info('nor will this')
2814 logger.setLevel(LOG_LEVEL)
2815
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002816 @classmethod
2817 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002818 logger = multiprocessing.get_logger()
2819 conn.send(logger.getEffectiveLevel())
2820
2821 def test_level(self):
2822 LEVEL1 = 32
2823 LEVEL2 = 37
2824
2825 logger = multiprocessing.get_logger()
2826 root_logger = logging.getLogger()
2827 root_level = root_logger.level
2828
2829 reader, writer = multiprocessing.Pipe(duplex=False)
2830
2831 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002832 p = self.Process(target=self._test_level, args=(writer,))
2833 p.daemon = True
2834 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002835 self.assertEqual(LEVEL1, reader.recv())
2836
2837 logger.setLevel(logging.NOTSET)
2838 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002839 p = self.Process(target=self._test_level, args=(writer,))
2840 p.daemon = True
2841 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002842 self.assertEqual(LEVEL2, reader.recv())
2843
2844 root_logger.setLevel(root_level)
2845 logger.setLevel(level=LOG_LEVEL)
2846
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002847
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002848# class _TestLoggingProcessName(BaseTestCase):
2849#
2850# def handle(self, record):
2851# assert record.processName == multiprocessing.current_process().name
2852# self.__handled = True
2853#
2854# def test_logging(self):
2855# handler = logging.Handler()
2856# handler.handle = self.handle
2857# self.__handled = False
2858# # Bypass getLogger() and side-effects
2859# logger = logging.getLoggerClass()(
2860# 'multiprocessing.test.TestLoggingProcessName')
2861# logger.addHandler(handler)
2862# logger.propagate = False
2863#
2864# logger.warn('foo')
2865# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002866
Benjamin Petersone711caf2008-06-11 16:44:04 +00002867#
Jesse Noller6214edd2009-01-19 16:23:53 +00002868# Test to verify handle verification, see issue 3321
2869#
2870
2871class TestInvalidHandle(unittest.TestCase):
2872
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002873 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002874 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002875 conn = multiprocessing.connection.Connection(44977608)
2876 try:
2877 self.assertRaises((ValueError, IOError), conn.poll)
2878 finally:
2879 # Hack private attribute _handle to avoid printing an error
2880 # in conn.__del__
2881 conn._handle = None
2882 self.assertRaises((ValueError, IOError),
2883 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002884
Jesse Noller6214edd2009-01-19 16:23:53 +00002885#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002886# Functions used to create test cases from the base ones in this module
2887#
2888
2889def get_attributes(Source, names):
2890 d = {}
2891 for name in names:
2892 obj = getattr(Source, name)
2893 if type(obj) == type(get_attributes):
2894 obj = staticmethod(obj)
2895 d[name] = obj
2896 return d
2897
2898def create_test_cases(Mixin, type):
2899 result = {}
2900 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002901 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002902 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002903
2904 for name in list(glob.keys()):
2905 if name.startswith('_Test'):
2906 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002907 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002908 if type in base.ALLOWED_TYPES:
2909 newname = 'With' + Type + name[1:]
2910 class Temp(base, unittest.TestCase, Mixin):
2911 pass
2912 result[newname] = Temp
2913 Temp.__name__ = newname
2914 Temp.__module__ = Mixin.__module__
2915 return result
2916
2917#
2918# Create test cases
2919#
2920
2921class ProcessesMixin(object):
2922 TYPE = 'processes'
2923 Process = multiprocessing.Process
2924 locals().update(get_attributes(multiprocessing, (
2925 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01002926 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002927 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002928 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002929 )))
2930
2931testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2932globals().update(testcases_processes)
2933
2934
2935class ManagerMixin(object):
2936 TYPE = 'manager'
2937 Process = multiprocessing.Process
2938 manager = object.__new__(multiprocessing.managers.SyncManager)
2939 locals().update(get_attributes(manager, (
2940 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01002941 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002942 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002943 )))
2944
2945testcases_manager = create_test_cases(ManagerMixin, type='manager')
2946globals().update(testcases_manager)
2947
2948
2949class ThreadsMixin(object):
2950 TYPE = 'threads'
2951 Process = multiprocessing.dummy.Process
2952 locals().update(get_attributes(multiprocessing.dummy, (
2953 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01002954 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002955 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002956 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002957 )))
2958
2959testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2960globals().update(testcases_threads)
2961
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002962class OtherTest(unittest.TestCase):
2963 # TODO: add more tests for deliver/answer challenge.
2964 def test_deliver_challenge_auth_failure(self):
2965 class _FakeConnection(object):
2966 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002967 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002968 def send_bytes(self, data):
2969 pass
2970 self.assertRaises(multiprocessing.AuthenticationError,
2971 multiprocessing.connection.deliver_challenge,
2972 _FakeConnection(), b'abc')
2973
2974 def test_answer_challenge_auth_failure(self):
2975 class _FakeConnection(object):
2976 def __init__(self):
2977 self.count = 0
2978 def recv_bytes(self, size):
2979 self.count += 1
2980 if self.count == 1:
2981 return multiprocessing.connection.CHALLENGE
2982 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002983 return b'something bogus'
2984 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002985 def send_bytes(self, data):
2986 pass
2987 self.assertRaises(multiprocessing.AuthenticationError,
2988 multiprocessing.connection.answer_challenge,
2989 _FakeConnection(), b'abc')
2990
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002991#
2992# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2993#
2994
2995def initializer(ns):
2996 ns.test += 1
2997
2998class TestInitializers(unittest.TestCase):
2999 def setUp(self):
3000 self.mgr = multiprocessing.Manager()
3001 self.ns = self.mgr.Namespace()
3002 self.ns.test = 0
3003
3004 def tearDown(self):
3005 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003006 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003007
3008 def test_manager_initializer(self):
3009 m = multiprocessing.managers.SyncManager()
3010 self.assertRaises(TypeError, m.start, 1)
3011 m.start(initializer, (self.ns,))
3012 self.assertEqual(self.ns.test, 1)
3013 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003014 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003015
3016 def test_pool_initializer(self):
3017 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3018 p = multiprocessing.Pool(1, initializer, (self.ns,))
3019 p.close()
3020 p.join()
3021 self.assertEqual(self.ns.test, 1)
3022
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003023#
3024# Issue 5155, 5313, 5331: Test process in processes
3025# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3026#
3027
3028def _ThisSubProcess(q):
3029 try:
3030 item = q.get(block=False)
3031 except pyqueue.Empty:
3032 pass
3033
3034def _TestProcess(q):
3035 queue = multiprocessing.Queue()
3036 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003037 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003038 subProc.start()
3039 subProc.join()
3040
3041def _afunc(x):
3042 return x*x
3043
3044def pool_in_process():
3045 pool = multiprocessing.Pool(processes=4)
3046 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003047 pool.close()
3048 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003049
3050class _file_like(object):
3051 def __init__(self, delegate):
3052 self._delegate = delegate
3053 self._pid = None
3054
3055 @property
3056 def cache(self):
3057 pid = os.getpid()
3058 # There are no race conditions since fork keeps only the running thread
3059 if pid != self._pid:
3060 self._pid = pid
3061 self._cache = []
3062 return self._cache
3063
3064 def write(self, data):
3065 self.cache.append(data)
3066
3067 def flush(self):
3068 self._delegate.write(''.join(self.cache))
3069 self._cache = []
3070
3071class TestStdinBadfiledescriptor(unittest.TestCase):
3072
3073 def test_queue_in_process(self):
3074 queue = multiprocessing.Queue()
3075 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3076 proc.start()
3077 proc.join()
3078
3079 def test_pool_in_process(self):
3080 p = multiprocessing.Process(target=pool_in_process)
3081 p.start()
3082 p.join()
3083
3084 def test_flushing(self):
3085 sio = io.StringIO()
3086 flike = _file_like(sio)
3087 flike.write('foo')
3088 proc = multiprocessing.Process(target=lambda: flike.flush())
3089 flike.flush()
3090 assert sio.getvalue() == 'foo'
3091
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003092
3093class TestWait(unittest.TestCase):
3094
3095 @classmethod
3096 def _child_test_wait(cls, w, slow):
3097 for i in range(10):
3098 if slow:
3099 time.sleep(random.random()*0.1)
3100 w.send((i, os.getpid()))
3101 w.close()
3102
3103 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003104 from multiprocessing.connection import wait
3105 readers = []
3106 procs = []
3107 messages = []
3108
3109 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003110 r, w = multiprocessing.Pipe(duplex=False)
3111 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003112 p.daemon = True
3113 p.start()
3114 w.close()
3115 readers.append(r)
3116 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003117 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003118
3119 while readers:
3120 for r in wait(readers):
3121 try:
3122 msg = r.recv()
3123 except EOFError:
3124 readers.remove(r)
3125 r.close()
3126 else:
3127 messages.append(msg)
3128
3129 messages.sort()
3130 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3131 self.assertEqual(messages, expected)
3132
3133 @classmethod
3134 def _child_test_wait_socket(cls, address, slow):
3135 s = socket.socket()
3136 s.connect(address)
3137 for i in range(10):
3138 if slow:
3139 time.sleep(random.random()*0.1)
3140 s.sendall(('%s\n' % i).encode('ascii'))
3141 s.close()
3142
3143 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003144 from multiprocessing.connection import wait
3145 l = socket.socket()
3146 l.bind(('', 0))
3147 l.listen(4)
3148 addr = ('localhost', l.getsockname()[1])
3149 readers = []
3150 procs = []
3151 dic = {}
3152
3153 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003154 p = multiprocessing.Process(target=self._child_test_wait_socket,
3155 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003156 p.daemon = True
3157 p.start()
3158 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003159 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003160
3161 for i in range(4):
3162 r, _ = l.accept()
3163 readers.append(r)
3164 dic[r] = []
3165 l.close()
3166
3167 while readers:
3168 for r in wait(readers):
3169 msg = r.recv(32)
3170 if not msg:
3171 readers.remove(r)
3172 r.close()
3173 else:
3174 dic[r].append(msg)
3175
3176 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3177 for v in dic.values():
3178 self.assertEqual(b''.join(v), expected)
3179
3180 def test_wait_slow(self):
3181 self.test_wait(True)
3182
3183 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003184 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003185
3186 def test_wait_timeout(self):
3187 from multiprocessing.connection import wait
3188
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003189 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003190 a, b = multiprocessing.Pipe()
3191
3192 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003193 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003194 delta = time.time() - start
3195
3196 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003197 self.assertLess(delta, expected * 2)
3198 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003199
3200 b.send(None)
3201
3202 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003203 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003204 delta = time.time() - start
3205
3206 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003207 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003208
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003209 @classmethod
3210 def signal_and_sleep(cls, sem, period):
3211 sem.release()
3212 time.sleep(period)
3213
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003214 def test_wait_integer(self):
3215 from multiprocessing.connection import wait
3216
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003217 expected = 3
3218 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003219 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003220 p = multiprocessing.Process(target=self.signal_and_sleep,
3221 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003222
3223 p.start()
3224 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003225 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003226
3227 start = time.time()
3228 res = wait([a, p.sentinel, b], expected + 20)
3229 delta = time.time() - start
3230
3231 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003232 self.assertLess(delta, expected + 2)
3233 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003234
3235 a.send(None)
3236
3237 start = time.time()
3238 res = wait([a, p.sentinel, b], 20)
3239 delta = time.time() - start
3240
3241 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01003242 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003243
3244 b.send(None)
3245
3246 start = time.time()
3247 res = wait([a, p.sentinel, b], 20)
3248 delta = time.time() - start
3249
3250 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01003251 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003252
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003253 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003254 p.join()
3255
Richard Oudkerk59d54042012-05-10 16:11:12 +01003256 def test_neg_timeout(self):
3257 from multiprocessing.connection import wait
3258 a, b = multiprocessing.Pipe()
3259 t = time.time()
3260 res = wait([a], timeout=-1)
3261 t = time.time() - t
3262 self.assertEqual(res, [])
3263 self.assertLess(t, 1)
3264 a.close()
3265 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003266
Antoine Pitrou709176f2012-04-01 17:19:09 +02003267#
3268# Issue 14151: Test invalid family on invalid environment
3269#
3270
3271class TestInvalidFamily(unittest.TestCase):
3272
3273 @unittest.skipIf(WIN32, "skipped on Windows")
3274 def test_invalid_family(self):
3275 with self.assertRaises(ValueError):
3276 multiprocessing.connection.Listener(r'\\.\test')
3277
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003278 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3279 def test_invalid_family_win32(self):
3280 with self.assertRaises(ValueError):
3281 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003282
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003283#
3284# Issue 12098: check sys.flags of child matches that for parent
3285#
3286
3287class TestFlags(unittest.TestCase):
3288 @classmethod
3289 def run_in_grandchild(cls, conn):
3290 conn.send(tuple(sys.flags))
3291
3292 @classmethod
3293 def run_in_child(cls):
3294 import json
3295 r, w = multiprocessing.Pipe(duplex=False)
3296 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3297 p.start()
3298 grandchild_flags = r.recv()
3299 p.join()
3300 r.close()
3301 w.close()
3302 flags = (tuple(sys.flags), grandchild_flags)
3303 print(json.dumps(flags))
3304
3305 def test_flags(self):
3306 import json, subprocess
3307 # start child process using unusual flags
3308 prog = ('from test.test_multiprocessing import TestFlags; ' +
3309 'TestFlags.run_in_child()')
3310 data = subprocess.check_output(
3311 [sys.executable, '-E', '-S', '-O', '-c', prog])
3312 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3313 self.assertEqual(child_flags, grandchild_flags)
3314
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003315testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003316 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
3317 TestFlags]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003318
Benjamin Petersone711caf2008-06-11 16:44:04 +00003319#
3320#
3321#
3322
3323def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003324 if sys.platform.startswith("linux"):
3325 try:
3326 lock = multiprocessing.RLock()
3327 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003328 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003329
Charles-François Natali221ef672011-11-22 18:55:22 +01003330 check_enough_semaphores()
3331
Benjamin Petersone711caf2008-06-11 16:44:04 +00003332 if run is None:
3333 from test.support import run_unittest as run
3334
3335 util.get_temp_dir() # creates temp directory for use by all processes
3336
3337 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3338
Benjamin Peterson41181742008-07-02 20:22:54 +00003339 ProcessesMixin.pool = multiprocessing.Pool(4)
3340 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
3341 ManagerMixin.manager.__init__()
3342 ManagerMixin.manager.start()
3343 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003344
3345 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003346 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3347 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003348 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3349 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003350 )
3351
3352 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3353 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003354 try:
3355 run(suite)
3356 finally:
3357 ThreadsMixin.pool.terminate()
3358 ProcessesMixin.pool.terminate()
3359 ManagerMixin.pool.terminate()
3360 ManagerMixin.pool.join()
3361 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003362 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003363 ThreadsMixin.pool.join()
3364 ProcessesMixin.pool.join()
3365 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00003366
3367def main():
3368 test_main(unittest.TextTestRunner(verbosity=2).run)
3369
3370if __name__ == '__main__':
3371 main()