blob: 6da55749b693653e1339ab00166e0aac863aad1e [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
R. David Murraya21e4ca2009-03-31 23:16:50 +000022import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000023
Benjamin Petersone5384b02008-10-04 22:00:42 +000024
R. David Murraya21e4ca2009-03-31 23:16:50 +000025# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000029# import threading after _multiprocessing to raise a more revelant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000033
Benjamin Petersone711caf2008-06-11 16:44:04 +000034import multiprocessing.dummy
35import multiprocessing.connection
36import multiprocessing.managers
37import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Charles-François Natalibc8f0822011-09-20 20:36:51 +020040from multiprocessing import util
41
42try:
43 from multiprocessing import reduction
44 HAS_REDUCTION = True
45except ImportError:
46 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
Brian Curtinafa88b52010-10-07 01:12:19 +000048try:
49 from multiprocessing.sharedctypes import Value, copy
50 HAS_SHAREDCTYPES = True
51except ImportError:
52 HAS_SHAREDCTYPES = False
53
Antoine Pitroubcb39d42011-08-23 19:46:22 +020054try:
55 import msvcrt
56except ImportError:
57 msvcrt = None
58
Benjamin Petersone711caf2008-06-11 16:44:04 +000059#
60#
61#
62
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000063def latin(s):
64 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000065
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Constants
68#
69
70LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000071#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000072
73DELTA = 0.1
74CHECK_TIMINGS = False # making true makes tests take a lot longer
75 # and can sometimes cause some non-serious
76 # failures because some calls block a bit
77 # longer than expected
78if CHECK_TIMINGS:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
80else:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
82
83HAVE_GETVALUE = not getattr(_multiprocessing,
84 'HAVE_BROKEN_SEM_GETVALUE', False)
85
Jesse Noller6214edd2009-01-19 16:23:53 +000086WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020087
Richard Oudkerk59d54042012-05-10 16:11:12 +010088from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090def wait_for_handle(handle, timeout):
91 if timeout is not None and timeout < 0.0:
92 timeout = None
93 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000094
Antoine Pitroubcb39d42011-08-23 19:46:22 +020095try:
96 MAXFD = os.sysconf("SC_OPEN_MAX")
97except:
98 MAXFD = 256
99
Benjamin Petersone711caf2008-06-11 16:44:04 +0000100#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000101# Some tests require ctypes
102#
103
104try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000105 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000106except ImportError:
107 Structure = object
108 c_int = c_double = None
109
Charles-François Natali221ef672011-11-22 18:55:22 +0100110
111def check_enough_semaphores():
112 """Check that the system supports enough semaphores to run the test."""
113 # minimum number of semaphores available according to POSIX
114 nsems_min = 256
115 try:
116 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
117 except (AttributeError, ValueError):
118 # sysconf not available or setting not available
119 return
120 if nsems == -1 or nsems >= nsems_min:
121 return
122 raise unittest.SkipTest("The OS doesn't support enough semaphores "
123 "to run the test (required: %d)." % nsems_min)
124
125
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000126#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127# Creates a wrapper for a function which records the time it takes to finish
128#
129
130class TimingWrapper(object):
131
132 def __init__(self, func):
133 self.func = func
134 self.elapsed = None
135
136 def __call__(self, *args, **kwds):
137 t = time.time()
138 try:
139 return self.func(*args, **kwds)
140 finally:
141 self.elapsed = time.time() - t
142
143#
144# Base class for test cases
145#
146
147class BaseTestCase(object):
148
149 ALLOWED_TYPES = ('processes', 'manager', 'threads')
150
151 def assertTimingAlmostEqual(self, a, b):
152 if CHECK_TIMINGS:
153 self.assertAlmostEqual(a, b, 1)
154
155 def assertReturnsIfImplemented(self, value, func, *args):
156 try:
157 res = func(*args)
158 except NotImplementedError:
159 pass
160 else:
161 return self.assertEqual(value, res)
162
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000163 # For the sanity of Windows users, rather than crashing or freezing in
164 # multiple ways.
165 def __reduce__(self, *args):
166 raise NotImplementedError("shouldn't try to pickle a test case")
167
168 __reduce_ex__ = __reduce__
169
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170#
171# Return the value of a semaphore
172#
173
174def get_value(self):
175 try:
176 return self.get_value()
177 except AttributeError:
178 try:
179 return self._Semaphore__value
180 except AttributeError:
181 try:
182 return self._value
183 except AttributeError:
184 raise NotImplementedError
185
186#
187# Testcases
188#
189
190class _TestProcess(BaseTestCase):
191
192 ALLOWED_TYPES = ('processes', 'threads')
193
194 def test_current(self):
195 if self.TYPE == 'threads':
196 return
197
198 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000199 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200
201 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000203 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 self.assertEqual(current.ident, os.getpid())
206 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000208 def test_daemon_argument(self):
209 if self.TYPE == "threads":
210 return
211
212 # By default uses the current process's daemon flag.
213 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000214 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000215 proc1 = self.Process(target=self._test, daemon=True)
216 self.assertTrue(proc1.daemon)
217 proc2 = self.Process(target=self._test, daemon=False)
218 self.assertFalse(proc2.daemon)
219
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000220 @classmethod
221 def _test(cls, q, *args, **kwds):
222 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 q.put(args)
224 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000226 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228 q.put(current.pid)
229
230 def test_process(self):
231 q = self.Queue(1)
232 e = self.Event()
233 args = (q, 1, 2)
234 kwargs = {'hello':23, 'bye':2.54}
235 name = 'SomeProcess'
236 p = self.Process(
237 target=self._test, args=args, kwargs=kwargs, name=name
238 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000239 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240 current = self.current_process()
241
242 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000243 self.assertEqual(p.authkey, current.authkey)
244 self.assertEqual(p.is_alive(), False)
245 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
250 p.start()
251
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.exitcode, None)
253 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000254 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(q.get(), args[1:])
257 self.assertEqual(q.get(), kwargs)
258 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000260 self.assertEqual(q.get(), current.authkey)
261 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 p.join()
264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(p.exitcode, 0)
266 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000267 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000269 @classmethod
270 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 time.sleep(1000)
272
273 def test_terminate(self):
274 if self.TYPE == 'threads':
275 return
276
277 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000278 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 p.start()
280
281 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000282 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000283 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000284
Richard Oudkerk59d54042012-05-10 16:11:12 +0100285 join = TimingWrapper(p.join)
286
287 self.assertEqual(join(0), None)
288 self.assertTimingAlmostEqual(join.elapsed, 0.0)
289 self.assertEqual(p.is_alive(), True)
290
291 self.assertEqual(join(-1), None)
292 self.assertTimingAlmostEqual(join.elapsed, 0.0)
293 self.assertEqual(p.is_alive(), True)
294
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295 p.terminate()
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 self.assertEqual(join(), None)
298 self.assertTimingAlmostEqual(join.elapsed, 0.0)
299
300 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
304
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000305 # XXX sometimes get p.exitcode == 0 on Windows ...
306 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307
308 def test_cpu_count(self):
309 try:
310 cpus = multiprocessing.cpu_count()
311 except NotImplementedError:
312 cpus = 1
313 self.assertTrue(type(cpus) is int)
314 self.assertTrue(cpus >= 1)
315
316 def test_active_children(self):
317 self.assertEqual(type(self.active_children()), list)
318
319 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000320 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
Jesus Cea94f964f2011-09-09 20:26:57 +0200322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000324 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000327 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000329 @classmethod
330 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 from multiprocessing import forking
332 wconn.send(id)
333 if len(id) < 2:
334 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000335 p = cls.Process(
336 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 )
338 p.start()
339 p.join()
340
341 def test_recursion(self):
342 rconn, wconn = self.Pipe(duplex=False)
343 self._test_recursion(wconn, [])
344
345 time.sleep(DELTA)
346 result = []
347 while rconn.poll():
348 result.append(rconn.recv())
349
350 expected = [
351 [],
352 [0],
353 [0, 0],
354 [0, 1],
355 [1],
356 [1, 0],
357 [1, 1]
358 ]
359 self.assertEqual(result, expected)
360
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200361 @classmethod
362 def _test_sentinel(cls, event):
363 event.wait(10.0)
364
365 def test_sentinel(self):
366 if self.TYPE == "threads":
367 return
368 event = self.Event()
369 p = self.Process(target=self._test_sentinel, args=(event,))
370 with self.assertRaises(ValueError):
371 p.sentinel
372 p.start()
373 self.addCleanup(p.join)
374 sentinel = p.sentinel
375 self.assertIsInstance(sentinel, int)
376 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
377 event.set()
378 p.join()
379 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
380
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381#
382#
383#
384
385class _UpperCaser(multiprocessing.Process):
386
387 def __init__(self):
388 multiprocessing.Process.__init__(self)
389 self.child_conn, self.parent_conn = multiprocessing.Pipe()
390
391 def run(self):
392 self.parent_conn.close()
393 for s in iter(self.child_conn.recv, None):
394 self.child_conn.send(s.upper())
395 self.child_conn.close()
396
397 def submit(self, s):
398 assert type(s) is str
399 self.parent_conn.send(s)
400 return self.parent_conn.recv()
401
402 def stop(self):
403 self.parent_conn.send(None)
404 self.parent_conn.close()
405 self.child_conn.close()
406
407class _TestSubclassingProcess(BaseTestCase):
408
409 ALLOWED_TYPES = ('processes',)
410
411 def test_subclassing(self):
412 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200413 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 uppercaser.start()
415 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
416 self.assertEqual(uppercaser.submit('world'), 'WORLD')
417 uppercaser.stop()
418 uppercaser.join()
419
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100420 def test_stderr_flush(self):
421 # sys.stderr is flushed at process shutdown (issue #13812)
422 if self.TYPE == "threads":
423 return
424
425 testfn = test.support.TESTFN
426 self.addCleanup(test.support.unlink, testfn)
427 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
428 proc.start()
429 proc.join()
430 with open(testfn, 'r') as f:
431 err = f.read()
432 # The whole traceback was printed
433 self.assertIn("ZeroDivisionError", err)
434 self.assertIn("test_multiprocessing.py", err)
435 self.assertIn("1/0 # MARKER", err)
436
437 @classmethod
438 def _test_stderr_flush(cls, testfn):
439 sys.stderr = open(testfn, 'w')
440 1/0 # MARKER
441
442
Richard Oudkerk29471de2012-06-06 19:04:57 +0100443 @classmethod
444 def _test_sys_exit(cls, reason, testfn):
445 sys.stderr = open(testfn, 'w')
446 sys.exit(reason)
447
448 def test_sys_exit(self):
449 # See Issue 13854
450 if self.TYPE == 'threads':
451 return
452
453 testfn = test.support.TESTFN
454 self.addCleanup(test.support.unlink, testfn)
455
456 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
457 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
458 p.daemon = True
459 p.start()
460 p.join(5)
461 self.assertEqual(p.exitcode, code)
462
463 with open(testfn, 'r') as f:
464 self.assertEqual(f.read().rstrip(), str(reason))
465
466 for reason in (True, False, 8):
467 p = self.Process(target=sys.exit, args=(reason,))
468 p.daemon = True
469 p.start()
470 p.join(5)
471 self.assertEqual(p.exitcode, reason)
472
Benjamin Petersone711caf2008-06-11 16:44:04 +0000473#
474#
475#
476
477def queue_empty(q):
478 if hasattr(q, 'empty'):
479 return q.empty()
480 else:
481 return q.qsize() == 0
482
483def queue_full(q, maxsize):
484 if hasattr(q, 'full'):
485 return q.full()
486 else:
487 return q.qsize() == maxsize
488
489
490class _TestQueue(BaseTestCase):
491
492
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000493 @classmethod
494 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000495 child_can_start.wait()
496 for i in range(6):
497 queue.get()
498 parent_can_continue.set()
499
500 def test_put(self):
501 MAXSIZE = 6
502 queue = self.Queue(maxsize=MAXSIZE)
503 child_can_start = self.Event()
504 parent_can_continue = self.Event()
505
506 proc = self.Process(
507 target=self._test_put,
508 args=(queue, child_can_start, parent_can_continue)
509 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000510 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 proc.start()
512
513 self.assertEqual(queue_empty(queue), True)
514 self.assertEqual(queue_full(queue, MAXSIZE), False)
515
516 queue.put(1)
517 queue.put(2, True)
518 queue.put(3, True, None)
519 queue.put(4, False)
520 queue.put(5, False, None)
521 queue.put_nowait(6)
522
523 # the values may be in buffer but not yet in pipe so sleep a bit
524 time.sleep(DELTA)
525
526 self.assertEqual(queue_empty(queue), False)
527 self.assertEqual(queue_full(queue, MAXSIZE), True)
528
529 put = TimingWrapper(queue.put)
530 put_nowait = TimingWrapper(queue.put_nowait)
531
532 self.assertRaises(pyqueue.Full, put, 7, False)
533 self.assertTimingAlmostEqual(put.elapsed, 0)
534
535 self.assertRaises(pyqueue.Full, put, 7, False, None)
536 self.assertTimingAlmostEqual(put.elapsed, 0)
537
538 self.assertRaises(pyqueue.Full, put_nowait, 7)
539 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
540
541 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
542 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
543
544 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
545 self.assertTimingAlmostEqual(put.elapsed, 0)
546
547 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
548 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
549
550 child_can_start.set()
551 parent_can_continue.wait()
552
553 self.assertEqual(queue_empty(queue), True)
554 self.assertEqual(queue_full(queue, MAXSIZE), False)
555
556 proc.join()
557
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000558 @classmethod
559 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000561 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 queue.put(2)
563 queue.put(3)
564 queue.put(4)
565 queue.put(5)
566 parent_can_continue.set()
567
568 def test_get(self):
569 queue = self.Queue()
570 child_can_start = self.Event()
571 parent_can_continue = self.Event()
572
573 proc = self.Process(
574 target=self._test_get,
575 args=(queue, child_can_start, parent_can_continue)
576 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000577 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000578 proc.start()
579
580 self.assertEqual(queue_empty(queue), True)
581
582 child_can_start.set()
583 parent_can_continue.wait()
584
585 time.sleep(DELTA)
586 self.assertEqual(queue_empty(queue), False)
587
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000588 # Hangs unexpectedly, remove for now
589 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000590 self.assertEqual(queue.get(True, None), 2)
591 self.assertEqual(queue.get(True), 3)
592 self.assertEqual(queue.get(timeout=1), 4)
593 self.assertEqual(queue.get_nowait(), 5)
594
595 self.assertEqual(queue_empty(queue), True)
596
597 get = TimingWrapper(queue.get)
598 get_nowait = TimingWrapper(queue.get_nowait)
599
600 self.assertRaises(pyqueue.Empty, get, False)
601 self.assertTimingAlmostEqual(get.elapsed, 0)
602
603 self.assertRaises(pyqueue.Empty, get, False, None)
604 self.assertTimingAlmostEqual(get.elapsed, 0)
605
606 self.assertRaises(pyqueue.Empty, get_nowait)
607 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
608
609 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
610 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
611
612 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
613 self.assertTimingAlmostEqual(get.elapsed, 0)
614
615 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
616 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
617
618 proc.join()
619
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000620 @classmethod
621 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000622 for i in range(10, 20):
623 queue.put(i)
624 # note that at this point the items may only be buffered, so the
625 # process cannot shutdown until the feeder thread has finished
626 # pushing items onto the pipe.
627
628 def test_fork(self):
629 # Old versions of Queue would fail to create a new feeder
630 # thread for a forked process if the original process had its
631 # own feeder thread. This test checks that this no longer
632 # happens.
633
634 queue = self.Queue()
635
636 # put items on queue so that main process starts a feeder thread
637 for i in range(10):
638 queue.put(i)
639
640 # wait to make sure thread starts before we fork a new process
641 time.sleep(DELTA)
642
643 # fork process
644 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200645 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000646 p.start()
647
648 # check that all expected items are in the queue
649 for i in range(20):
650 self.assertEqual(queue.get(), i)
651 self.assertRaises(pyqueue.Empty, queue.get, False)
652
653 p.join()
654
655 def test_qsize(self):
656 q = self.Queue()
657 try:
658 self.assertEqual(q.qsize(), 0)
659 except NotImplementedError:
660 return
661 q.put(1)
662 self.assertEqual(q.qsize(), 1)
663 q.put(5)
664 self.assertEqual(q.qsize(), 2)
665 q.get()
666 self.assertEqual(q.qsize(), 1)
667 q.get()
668 self.assertEqual(q.qsize(), 0)
669
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000670 @classmethod
671 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000672 for obj in iter(q.get, None):
673 time.sleep(DELTA)
674 q.task_done()
675
676 def test_task_done(self):
677 queue = self.JoinableQueue()
678
679 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000680 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000681
682 workers = [self.Process(target=self._test_task_done, args=(queue,))
683 for i in range(4)]
684
685 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200686 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000687 p.start()
688
689 for i in range(10):
690 queue.put(i)
691
692 queue.join()
693
694 for p in workers:
695 queue.put(None)
696
697 for p in workers:
698 p.join()
699
700#
701#
702#
703
704class _TestLock(BaseTestCase):
705
706 def test_lock(self):
707 lock = self.Lock()
708 self.assertEqual(lock.acquire(), True)
709 self.assertEqual(lock.acquire(False), False)
710 self.assertEqual(lock.release(), None)
711 self.assertRaises((ValueError, threading.ThreadError), lock.release)
712
713 def test_rlock(self):
714 lock = self.RLock()
715 self.assertEqual(lock.acquire(), True)
716 self.assertEqual(lock.acquire(), True)
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.release(), None)
719 self.assertEqual(lock.release(), None)
720 self.assertEqual(lock.release(), None)
721 self.assertRaises((AssertionError, RuntimeError), lock.release)
722
Jesse Nollerf8d00852009-03-31 03:25:07 +0000723 def test_lock_context(self):
724 with self.Lock():
725 pass
726
Benjamin Petersone711caf2008-06-11 16:44:04 +0000727
728class _TestSemaphore(BaseTestCase):
729
730 def _test_semaphore(self, sem):
731 self.assertReturnsIfImplemented(2, get_value, sem)
732 self.assertEqual(sem.acquire(), True)
733 self.assertReturnsIfImplemented(1, get_value, sem)
734 self.assertEqual(sem.acquire(), True)
735 self.assertReturnsIfImplemented(0, get_value, sem)
736 self.assertEqual(sem.acquire(False), False)
737 self.assertReturnsIfImplemented(0, get_value, sem)
738 self.assertEqual(sem.release(), None)
739 self.assertReturnsIfImplemented(1, get_value, sem)
740 self.assertEqual(sem.release(), None)
741 self.assertReturnsIfImplemented(2, get_value, sem)
742
743 def test_semaphore(self):
744 sem = self.Semaphore(2)
745 self._test_semaphore(sem)
746 self.assertEqual(sem.release(), None)
747 self.assertReturnsIfImplemented(3, get_value, sem)
748 self.assertEqual(sem.release(), None)
749 self.assertReturnsIfImplemented(4, get_value, sem)
750
751 def test_bounded_semaphore(self):
752 sem = self.BoundedSemaphore(2)
753 self._test_semaphore(sem)
754 # Currently fails on OS/X
755 #if HAVE_GETVALUE:
756 # self.assertRaises(ValueError, sem.release)
757 # self.assertReturnsIfImplemented(2, get_value, sem)
758
759 def test_timeout(self):
760 if self.TYPE != 'processes':
761 return
762
763 sem = self.Semaphore(0)
764 acquire = TimingWrapper(sem.acquire)
765
766 self.assertEqual(acquire(False), False)
767 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
768
769 self.assertEqual(acquire(False, None), False)
770 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
771
772 self.assertEqual(acquire(False, TIMEOUT1), False)
773 self.assertTimingAlmostEqual(acquire.elapsed, 0)
774
775 self.assertEqual(acquire(True, TIMEOUT2), False)
776 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
777
778 self.assertEqual(acquire(timeout=TIMEOUT3), False)
779 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
780
781
782class _TestCondition(BaseTestCase):
783
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000784 @classmethod
785 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 cond.acquire()
787 sleeping.release()
788 cond.wait(timeout)
789 woken.release()
790 cond.release()
791
792 def check_invariant(self, cond):
793 # this is only supposed to succeed when there are no sleepers
794 if self.TYPE == 'processes':
795 try:
796 sleepers = (cond._sleeping_count.get_value() -
797 cond._woken_count.get_value())
798 self.assertEqual(sleepers, 0)
799 self.assertEqual(cond._wait_semaphore.get_value(), 0)
800 except NotImplementedError:
801 pass
802
803 def test_notify(self):
804 cond = self.Condition()
805 sleeping = self.Semaphore(0)
806 woken = self.Semaphore(0)
807
808 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000809 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810 p.start()
811
812 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000813 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 p.start()
815
816 # wait for both children to start sleeping
817 sleeping.acquire()
818 sleeping.acquire()
819
820 # check no process/thread has woken up
821 time.sleep(DELTA)
822 self.assertReturnsIfImplemented(0, get_value, woken)
823
824 # wake up one process/thread
825 cond.acquire()
826 cond.notify()
827 cond.release()
828
829 # check one process/thread has woken up
830 time.sleep(DELTA)
831 self.assertReturnsIfImplemented(1, get_value, woken)
832
833 # wake up another
834 cond.acquire()
835 cond.notify()
836 cond.release()
837
838 # check other has woken up
839 time.sleep(DELTA)
840 self.assertReturnsIfImplemented(2, get_value, woken)
841
842 # check state is not mucked up
843 self.check_invariant(cond)
844 p.join()
845
846 def test_notify_all(self):
847 cond = self.Condition()
848 sleeping = self.Semaphore(0)
849 woken = self.Semaphore(0)
850
851 # start some threads/processes which will timeout
852 for i in range(3):
853 p = self.Process(target=self.f,
854 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000855 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000856 p.start()
857
858 t = threading.Thread(target=self.f,
859 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000860 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 t.start()
862
863 # wait for them all to sleep
864 for i in range(6):
865 sleeping.acquire()
866
867 # check they have all timed out
868 for i in range(6):
869 woken.acquire()
870 self.assertReturnsIfImplemented(0, get_value, woken)
871
872 # check state is not mucked up
873 self.check_invariant(cond)
874
875 # start some more threads/processes
876 for i in range(3):
877 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000878 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000879 p.start()
880
881 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000882 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883 t.start()
884
885 # wait for them to all sleep
886 for i in range(6):
887 sleeping.acquire()
888
889 # check no process/thread has woken up
890 time.sleep(DELTA)
891 self.assertReturnsIfImplemented(0, get_value, woken)
892
893 # wake them all up
894 cond.acquire()
895 cond.notify_all()
896 cond.release()
897
898 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200899 for i in range(10):
900 try:
901 if get_value(woken) == 6:
902 break
903 except NotImplementedError:
904 break
905 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906 self.assertReturnsIfImplemented(6, get_value, woken)
907
908 # check state is not mucked up
909 self.check_invariant(cond)
910
911 def test_timeout(self):
912 cond = self.Condition()
913 wait = TimingWrapper(cond.wait)
914 cond.acquire()
915 res = wait(TIMEOUT1)
916 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000917 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
919
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200920 @classmethod
921 def _test_waitfor_f(cls, cond, state):
922 with cond:
923 state.value = 0
924 cond.notify()
925 result = cond.wait_for(lambda : state.value==4)
926 if not result or state.value != 4:
927 sys.exit(1)
928
929 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
930 def test_waitfor(self):
931 # based on test in test/lock_tests.py
932 cond = self.Condition()
933 state = self.Value('i', -1)
934
935 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
936 p.daemon = True
937 p.start()
938
939 with cond:
940 result = cond.wait_for(lambda : state.value==0)
941 self.assertTrue(result)
942 self.assertEqual(state.value, 0)
943
944 for i in range(4):
945 time.sleep(0.01)
946 with cond:
947 state.value += 1
948 cond.notify()
949
950 p.join(5)
951 self.assertFalse(p.is_alive())
952 self.assertEqual(p.exitcode, 0)
953
954 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100955 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
956 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200957 with cond:
958 expected = 0.1
959 dt = time.time()
960 result = cond.wait_for(lambda : state.value==4, timeout=expected)
961 dt = time.time() - dt
962 # borrow logic in assertTimeout() from test/lock_tests.py
963 if not result and expected * 0.6 < dt < expected * 10.0:
964 success.value = True
965
966 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
967 def test_waitfor_timeout(self):
968 # based on test in test/lock_tests.py
969 cond = self.Condition()
970 state = self.Value('i', 0)
971 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100972 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200973
974 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100975 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200976 p.daemon = True
977 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100978 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200979
980 # Only increment 3 times, so state == 4 is never reached.
981 for i in range(3):
982 time.sleep(0.01)
983 with cond:
984 state.value += 1
985 cond.notify()
986
987 p.join(5)
988 self.assertTrue(success.value)
989
Richard Oudkerk98449932012-06-05 13:15:29 +0100990 @classmethod
991 def _test_wait_result(cls, c, pid):
992 with c:
993 c.notify()
994 time.sleep(1)
995 if pid is not None:
996 os.kill(pid, signal.SIGINT)
997
998 def test_wait_result(self):
999 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1000 pid = os.getpid()
1001 else:
1002 pid = None
1003
1004 c = self.Condition()
1005 with c:
1006 self.assertFalse(c.wait(0))
1007 self.assertFalse(c.wait(0.1))
1008
1009 p = self.Process(target=self._test_wait_result, args=(c, pid))
1010 p.start()
1011
1012 self.assertTrue(c.wait(10))
1013 if pid is not None:
1014 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1015
1016 p.join()
1017
Benjamin Petersone711caf2008-06-11 16:44:04 +00001018
1019class _TestEvent(BaseTestCase):
1020
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001021 @classmethod
1022 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001023 time.sleep(TIMEOUT2)
1024 event.set()
1025
1026 def test_event(self):
1027 event = self.Event()
1028 wait = TimingWrapper(event.wait)
1029
Ezio Melotti13925002011-03-16 11:05:33 +02001030 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001031 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001032 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033
Benjamin Peterson965ce872009-04-05 21:24:58 +00001034 # Removed, threading.Event.wait() will return the value of the __flag
1035 # instead of None. API Shear with the semaphore backed mp.Event
1036 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001038 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1040
1041 event.set()
1042
1043 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001044 self.assertEqual(event.is_set(), True)
1045 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001046 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001047 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1049 # self.assertEqual(event.is_set(), True)
1050
1051 event.clear()
1052
1053 #self.assertEqual(event.is_set(), False)
1054
Jesus Cea94f964f2011-09-09 20:26:57 +02001055 p = self.Process(target=self._test_event, args=(event,))
1056 p.daemon = True
1057 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001058 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059
1060#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001061# Tests for Barrier - adapted from tests in test/lock_tests.py
1062#
1063
1064# Many of the tests for threading.Barrier use a list as an atomic
1065# counter: a value is appended to increment the counter, and the
1066# length of the list gives the value. We use the class DummyList
1067# for the same purpose.
1068
1069class _DummyList(object):
1070
1071 def __init__(self):
1072 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1073 lock = multiprocessing.Lock()
1074 self.__setstate__((wrapper, lock))
1075 self._lengthbuf[0] = 0
1076
1077 def __setstate__(self, state):
1078 (self._wrapper, self._lock) = state
1079 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1080
1081 def __getstate__(self):
1082 return (self._wrapper, self._lock)
1083
1084 def append(self, _):
1085 with self._lock:
1086 self._lengthbuf[0] += 1
1087
1088 def __len__(self):
1089 with self._lock:
1090 return self._lengthbuf[0]
1091
1092def _wait():
1093 # A crude wait/yield function not relying on synchronization primitives.
1094 time.sleep(0.01)
1095
1096
1097class Bunch(object):
1098 """
1099 A bunch of threads.
1100 """
1101 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1102 """
1103 Construct a bunch of `n` threads running the same function `f`.
1104 If `wait_before_exit` is True, the threads won't terminate until
1105 do_finish() is called.
1106 """
1107 self.f = f
1108 self.args = args
1109 self.n = n
1110 self.started = namespace.DummyList()
1111 self.finished = namespace.DummyList()
1112 self._can_exit = namespace.Value('i', not wait_before_exit)
1113 for i in range(n):
1114 namespace.Process(target=self.task).start()
1115
1116 def task(self):
1117 pid = os.getpid()
1118 self.started.append(pid)
1119 try:
1120 self.f(*self.args)
1121 finally:
1122 self.finished.append(pid)
1123 while not self._can_exit.value:
1124 _wait()
1125
1126 def wait_for_started(self):
1127 while len(self.started) < self.n:
1128 _wait()
1129
1130 def wait_for_finished(self):
1131 while len(self.finished) < self.n:
1132 _wait()
1133
1134 def do_finish(self):
1135 self._can_exit.value = True
1136
1137
1138class AppendTrue(object):
1139 def __init__(self, obj):
1140 self.obj = obj
1141 def __call__(self):
1142 self.obj.append(True)
1143
1144
1145class _TestBarrier(BaseTestCase):
1146 """
1147 Tests for Barrier objects.
1148 """
1149 N = 5
1150 defaultTimeout = 10.0 # XXX Slow Windows buildbots need generous timeout
1151
1152 def setUp(self):
1153 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1154
1155 def tearDown(self):
1156 self.barrier.abort()
1157 self.barrier = None
1158
1159 def DummyList(self):
1160 if self.TYPE == 'threads':
1161 return []
1162 elif self.TYPE == 'manager':
1163 return self.manager.list()
1164 else:
1165 return _DummyList()
1166
1167 def run_threads(self, f, args):
1168 b = Bunch(self, f, args, self.N-1)
1169 f(*args)
1170 b.wait_for_finished()
1171
1172 @classmethod
1173 def multipass(cls, barrier, results, n):
1174 m = barrier.parties
1175 assert m == cls.N
1176 for i in range(n):
1177 results[0].append(True)
1178 assert len(results[1]) == i * m
1179 barrier.wait()
1180 results[1].append(True)
1181 assert len(results[0]) == (i + 1) * m
1182 barrier.wait()
1183 try:
1184 assert barrier.n_waiting == 0
1185 except NotImplementedError:
1186 pass
1187 assert not barrier.broken
1188
1189 def test_barrier(self, passes=1):
1190 """
1191 Test that a barrier is passed in lockstep
1192 """
1193 results = [self.DummyList(), self.DummyList()]
1194 self.run_threads(self.multipass, (self.barrier, results, passes))
1195
1196 def test_barrier_10(self):
1197 """
1198 Test that a barrier works for 10 consecutive runs
1199 """
1200 return self.test_barrier(10)
1201
1202 @classmethod
1203 def _test_wait_return_f(cls, barrier, queue):
1204 res = barrier.wait()
1205 queue.put(res)
1206
1207 def test_wait_return(self):
1208 """
1209 test the return value from barrier.wait
1210 """
1211 queue = self.Queue()
1212 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1213 results = [queue.get() for i in range(self.N)]
1214 self.assertEqual(results.count(0), 1)
1215
1216 @classmethod
1217 def _test_action_f(cls, barrier, results):
1218 barrier.wait()
1219 if len(results) != 1:
1220 raise RuntimeError
1221
1222 def test_action(self):
1223 """
1224 Test the 'action' callback
1225 """
1226 results = self.DummyList()
1227 barrier = self.Barrier(self.N, action=AppendTrue(results))
1228 self.run_threads(self._test_action_f, (barrier, results))
1229 self.assertEqual(len(results), 1)
1230
1231 @classmethod
1232 def _test_abort_f(cls, barrier, results1, results2):
1233 try:
1234 i = barrier.wait()
1235 if i == cls.N//2:
1236 raise RuntimeError
1237 barrier.wait()
1238 results1.append(True)
1239 except threading.BrokenBarrierError:
1240 results2.append(True)
1241 except RuntimeError:
1242 barrier.abort()
1243
1244 def test_abort(self):
1245 """
1246 Test that an abort will put the barrier in a broken state
1247 """
1248 results1 = self.DummyList()
1249 results2 = self.DummyList()
1250 self.run_threads(self._test_abort_f,
1251 (self.barrier, results1, results2))
1252 self.assertEqual(len(results1), 0)
1253 self.assertEqual(len(results2), self.N-1)
1254 self.assertTrue(self.barrier.broken)
1255
1256 @classmethod
1257 def _test_reset_f(cls, barrier, results1, results2, results3):
1258 i = barrier.wait()
1259 if i == cls.N//2:
1260 # Wait until the other threads are all in the barrier.
1261 while barrier.n_waiting < cls.N-1:
1262 time.sleep(0.001)
1263 barrier.reset()
1264 else:
1265 try:
1266 barrier.wait()
1267 results1.append(True)
1268 except threading.BrokenBarrierError:
1269 results2.append(True)
1270 # Now, pass the barrier again
1271 barrier.wait()
1272 results3.append(True)
1273
1274 def test_reset(self):
1275 """
1276 Test that a 'reset' on a barrier frees the waiting threads
1277 """
1278 results1 = self.DummyList()
1279 results2 = self.DummyList()
1280 results3 = self.DummyList()
1281 self.run_threads(self._test_reset_f,
1282 (self.barrier, results1, results2, results3))
1283 self.assertEqual(len(results1), 0)
1284 self.assertEqual(len(results2), self.N-1)
1285 self.assertEqual(len(results3), self.N)
1286
1287 @classmethod
1288 def _test_abort_and_reset_f(cls, barrier, barrier2,
1289 results1, results2, results3):
1290 try:
1291 i = barrier.wait()
1292 if i == cls.N//2:
1293 raise RuntimeError
1294 barrier.wait()
1295 results1.append(True)
1296 except threading.BrokenBarrierError:
1297 results2.append(True)
1298 except RuntimeError:
1299 barrier.abort()
1300 # Synchronize and reset the barrier. Must synchronize first so
1301 # that everyone has left it when we reset, and after so that no
1302 # one enters it before the reset.
1303 if barrier2.wait() == cls.N//2:
1304 barrier.reset()
1305 barrier2.wait()
1306 barrier.wait()
1307 results3.append(True)
1308
1309 def test_abort_and_reset(self):
1310 """
1311 Test that a barrier can be reset after being broken.
1312 """
1313 results1 = self.DummyList()
1314 results2 = self.DummyList()
1315 results3 = self.DummyList()
1316 barrier2 = self.Barrier(self.N)
1317
1318 self.run_threads(self._test_abort_and_reset_f,
1319 (self.barrier, barrier2, results1, results2, results3))
1320 self.assertEqual(len(results1), 0)
1321 self.assertEqual(len(results2), self.N-1)
1322 self.assertEqual(len(results3), self.N)
1323
1324 @classmethod
1325 def _test_timeout_f(cls, barrier, results):
1326 i = barrier.wait(20)
1327 if i == cls.N//2:
1328 # One thread is late!
1329 time.sleep(4.0)
1330 try:
1331 barrier.wait(0.5)
1332 except threading.BrokenBarrierError:
1333 results.append(True)
1334
1335 def test_timeout(self):
1336 """
1337 Test wait(timeout)
1338 """
1339 results = self.DummyList()
1340 self.run_threads(self._test_timeout_f, (self.barrier, results))
1341 self.assertEqual(len(results), self.barrier.parties)
1342
1343 @classmethod
1344 def _test_default_timeout_f(cls, barrier, results):
1345 i = barrier.wait(20)
1346 if i == cls.N//2:
1347 # One thread is later than the default timeout
1348 time.sleep(4.0)
1349 try:
1350 barrier.wait()
1351 except threading.BrokenBarrierError:
1352 results.append(True)
1353
1354 def test_default_timeout(self):
1355 """
1356 Test the barrier's default timeout
1357 """
1358 barrier = self.Barrier(self.N, timeout=1.0)
1359 results = self.DummyList()
1360 self.run_threads(self._test_default_timeout_f, (barrier, results))
1361 self.assertEqual(len(results), barrier.parties)
1362
1363 def test_single_thread(self):
1364 b = self.Barrier(1)
1365 b.wait()
1366 b.wait()
1367
1368 @classmethod
1369 def _test_thousand_f(cls, barrier, passes, conn, lock):
1370 for i in range(passes):
1371 barrier.wait()
1372 with lock:
1373 conn.send(i)
1374
1375 def test_thousand(self):
1376 if self.TYPE == 'manager':
1377 return
1378 passes = 1000
1379 lock = self.Lock()
1380 conn, child_conn = self.Pipe(False)
1381 for j in range(self.N):
1382 p = self.Process(target=self._test_thousand_f,
1383 args=(self.barrier, passes, child_conn, lock))
1384 p.start()
1385
1386 for i in range(passes):
1387 for j in range(self.N):
1388 self.assertEqual(conn.recv(), i)
1389
1390#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001391#
1392#
1393
1394class _TestValue(BaseTestCase):
1395
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001396 ALLOWED_TYPES = ('processes',)
1397
Benjamin Petersone711caf2008-06-11 16:44:04 +00001398 codes_values = [
1399 ('i', 4343, 24234),
1400 ('d', 3.625, -4.25),
1401 ('h', -232, 234),
1402 ('c', latin('x'), latin('y'))
1403 ]
1404
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001405 def setUp(self):
1406 if not HAS_SHAREDCTYPES:
1407 self.skipTest("requires multiprocessing.sharedctypes")
1408
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001409 @classmethod
1410 def _test(cls, values):
1411 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001412 sv.value = cv[2]
1413
1414
1415 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001416 if raw:
1417 values = [self.RawValue(code, value)
1418 for code, value, _ in self.codes_values]
1419 else:
1420 values = [self.Value(code, value)
1421 for code, value, _ in self.codes_values]
1422
1423 for sv, cv in zip(values, self.codes_values):
1424 self.assertEqual(sv.value, cv[1])
1425
1426 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001427 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001428 proc.start()
1429 proc.join()
1430
1431 for sv, cv in zip(values, self.codes_values):
1432 self.assertEqual(sv.value, cv[2])
1433
1434 def test_rawvalue(self):
1435 self.test_value(raw=True)
1436
1437 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001438 val1 = self.Value('i', 5)
1439 lock1 = val1.get_lock()
1440 obj1 = val1.get_obj()
1441
1442 val2 = self.Value('i', 5, lock=None)
1443 lock2 = val2.get_lock()
1444 obj2 = val2.get_obj()
1445
1446 lock = self.Lock()
1447 val3 = self.Value('i', 5, lock=lock)
1448 lock3 = val3.get_lock()
1449 obj3 = val3.get_obj()
1450 self.assertEqual(lock, lock3)
1451
Jesse Nollerb0516a62009-01-18 03:11:38 +00001452 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 self.assertFalse(hasattr(arr4, 'get_lock'))
1454 self.assertFalse(hasattr(arr4, 'get_obj'))
1455
Jesse Nollerb0516a62009-01-18 03:11:38 +00001456 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1457
1458 arr5 = self.RawValue('i', 5)
1459 self.assertFalse(hasattr(arr5, 'get_lock'))
1460 self.assertFalse(hasattr(arr5, 'get_obj'))
1461
Benjamin Petersone711caf2008-06-11 16:44:04 +00001462
1463class _TestArray(BaseTestCase):
1464
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001465 ALLOWED_TYPES = ('processes',)
1466
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001467 @classmethod
1468 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001469 for i in range(1, len(seq)):
1470 seq[i] += seq[i-1]
1471
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001472 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001473 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001474 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1475 if raw:
1476 arr = self.RawArray('i', seq)
1477 else:
1478 arr = self.Array('i', seq)
1479
1480 self.assertEqual(len(arr), len(seq))
1481 self.assertEqual(arr[3], seq[3])
1482 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1483
1484 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1485
1486 self.assertEqual(list(arr[:]), seq)
1487
1488 self.f(seq)
1489
1490 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001491 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001492 p.start()
1493 p.join()
1494
1495 self.assertEqual(list(arr[:]), seq)
1496
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001497 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001498 def test_array_from_size(self):
1499 size = 10
1500 # Test for zeroing (see issue #11675).
1501 # The repetition below strengthens the test by increasing the chances
1502 # of previously allocated non-zero memory being used for the new array
1503 # on the 2nd and 3rd loops.
1504 for _ in range(3):
1505 arr = self.Array('i', size)
1506 self.assertEqual(len(arr), size)
1507 self.assertEqual(list(arr), [0] * size)
1508 arr[:] = range(10)
1509 self.assertEqual(list(arr), list(range(10)))
1510 del arr
1511
1512 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001513 def test_rawarray(self):
1514 self.test_array(raw=True)
1515
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001516 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001517 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001518 arr1 = self.Array('i', list(range(10)))
1519 lock1 = arr1.get_lock()
1520 obj1 = arr1.get_obj()
1521
1522 arr2 = self.Array('i', list(range(10)), lock=None)
1523 lock2 = arr2.get_lock()
1524 obj2 = arr2.get_obj()
1525
1526 lock = self.Lock()
1527 arr3 = self.Array('i', list(range(10)), lock=lock)
1528 lock3 = arr3.get_lock()
1529 obj3 = arr3.get_obj()
1530 self.assertEqual(lock, lock3)
1531
Jesse Nollerb0516a62009-01-18 03:11:38 +00001532 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001533 self.assertFalse(hasattr(arr4, 'get_lock'))
1534 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001535 self.assertRaises(AttributeError,
1536 self.Array, 'i', range(10), lock='notalock')
1537
1538 arr5 = self.RawArray('i', range(10))
1539 self.assertFalse(hasattr(arr5, 'get_lock'))
1540 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001541
1542#
1543#
1544#
1545
1546class _TestContainers(BaseTestCase):
1547
1548 ALLOWED_TYPES = ('manager',)
1549
1550 def test_list(self):
1551 a = self.list(list(range(10)))
1552 self.assertEqual(a[:], list(range(10)))
1553
1554 b = self.list()
1555 self.assertEqual(b[:], [])
1556
1557 b.extend(list(range(5)))
1558 self.assertEqual(b[:], list(range(5)))
1559
1560 self.assertEqual(b[2], 2)
1561 self.assertEqual(b[2:10], [2,3,4])
1562
1563 b *= 2
1564 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1565
1566 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1567
1568 self.assertEqual(a[:], list(range(10)))
1569
1570 d = [a, b]
1571 e = self.list(d)
1572 self.assertEqual(
1573 e[:],
1574 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1575 )
1576
1577 f = self.list([a])
1578 a.append('hello')
1579 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1580
1581 def test_dict(self):
1582 d = self.dict()
1583 indices = list(range(65, 70))
1584 for i in indices:
1585 d[i] = chr(i)
1586 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1587 self.assertEqual(sorted(d.keys()), indices)
1588 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1589 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1590
1591 def test_namespace(self):
1592 n = self.Namespace()
1593 n.name = 'Bob'
1594 n.job = 'Builder'
1595 n._hidden = 'hidden'
1596 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1597 del n.job
1598 self.assertEqual(str(n), "Namespace(name='Bob')")
1599 self.assertTrue(hasattr(n, 'name'))
1600 self.assertTrue(not hasattr(n, 'job'))
1601
1602#
1603#
1604#
1605
1606def sqr(x, wait=0.0):
1607 time.sleep(wait)
1608 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001609
Antoine Pitroude911b22011-12-21 11:03:24 +01001610def mul(x, y):
1611 return x*y
1612
Benjamin Petersone711caf2008-06-11 16:44:04 +00001613class _TestPool(BaseTestCase):
1614
1615 def test_apply(self):
1616 papply = self.pool.apply
1617 self.assertEqual(papply(sqr, (5,)), sqr(5))
1618 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1619
1620 def test_map(self):
1621 pmap = self.pool.map
1622 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1623 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1624 list(map(sqr, list(range(100)))))
1625
Antoine Pitroude911b22011-12-21 11:03:24 +01001626 def test_starmap(self):
1627 psmap = self.pool.starmap
1628 tuples = list(zip(range(10), range(9,-1, -1)))
1629 self.assertEqual(psmap(mul, tuples),
1630 list(itertools.starmap(mul, tuples)))
1631 tuples = list(zip(range(100), range(99,-1, -1)))
1632 self.assertEqual(psmap(mul, tuples, chunksize=20),
1633 list(itertools.starmap(mul, tuples)))
1634
1635 def test_starmap_async(self):
1636 tuples = list(zip(range(100), range(99,-1, -1)))
1637 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1638 list(itertools.starmap(mul, tuples)))
1639
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001640 def test_map_chunksize(self):
1641 try:
1642 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1643 except multiprocessing.TimeoutError:
1644 self.fail("pool.map_async with chunksize stalled on null list")
1645
Benjamin Petersone711caf2008-06-11 16:44:04 +00001646 def test_async(self):
1647 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1648 get = TimingWrapper(res.get)
1649 self.assertEqual(get(), 49)
1650 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1651
1652 def test_async_timeout(self):
1653 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1654 get = TimingWrapper(res.get)
1655 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1656 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1657
1658 def test_imap(self):
1659 it = self.pool.imap(sqr, list(range(10)))
1660 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1661
1662 it = self.pool.imap(sqr, list(range(10)))
1663 for i in range(10):
1664 self.assertEqual(next(it), i*i)
1665 self.assertRaises(StopIteration, it.__next__)
1666
1667 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1668 for i in range(1000):
1669 self.assertEqual(next(it), i*i)
1670 self.assertRaises(StopIteration, it.__next__)
1671
1672 def test_imap_unordered(self):
1673 it = self.pool.imap_unordered(sqr, list(range(1000)))
1674 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1675
1676 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1677 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1678
1679 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001680 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1681 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1682
Benjamin Petersone711caf2008-06-11 16:44:04 +00001683 p = multiprocessing.Pool(3)
1684 self.assertEqual(3, len(p._pool))
1685 p.close()
1686 p.join()
1687
1688 def test_terminate(self):
1689 if self.TYPE == 'manager':
1690 # On Unix a forked process increfs each shared object to
1691 # which its parent process held a reference. If the
1692 # forked process gets terminated then there is likely to
1693 # be a reference leak. So to prevent
1694 # _TestZZZNumberOfObjects from failing we skip this test
1695 # when using a manager.
1696 return
1697
1698 result = self.pool.map_async(
1699 time.sleep, [0.1 for i in range(10000)], chunksize=1
1700 )
1701 self.pool.terminate()
1702 join = TimingWrapper(self.pool.join)
1703 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001704 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001705
Richard Oudkerke41682b2012-06-06 19:04:57 +01001706 def test_empty_iterable(self):
1707 # See Issue 12157
1708 p = self.Pool(1)
1709
1710 self.assertEqual(p.map(sqr, []), [])
1711 self.assertEqual(list(p.imap(sqr, [])), [])
1712 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1713 self.assertEqual(p.map_async(sqr, []).get(), [])
1714
1715 p.close()
1716 p.join()
1717
Ask Solem2afcbf22010-11-09 20:55:52 +00001718def raising():
1719 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001720
Ask Solem2afcbf22010-11-09 20:55:52 +00001721def unpickleable_result():
1722 return lambda: 42
1723
1724class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001725 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001726
1727 def test_async_error_callback(self):
1728 p = multiprocessing.Pool(2)
1729
1730 scratchpad = [None]
1731 def errback(exc):
1732 scratchpad[0] = exc
1733
1734 res = p.apply_async(raising, error_callback=errback)
1735 self.assertRaises(KeyError, res.get)
1736 self.assertTrue(scratchpad[0])
1737 self.assertIsInstance(scratchpad[0], KeyError)
1738
1739 p.close()
1740 p.join()
1741
1742 def test_unpickleable_result(self):
1743 from multiprocessing.pool import MaybeEncodingError
1744 p = multiprocessing.Pool(2)
1745
1746 # Make sure we don't lose pool processes because of encoding errors.
1747 for iteration in range(20):
1748
1749 scratchpad = [None]
1750 def errback(exc):
1751 scratchpad[0] = exc
1752
1753 res = p.apply_async(unpickleable_result, error_callback=errback)
1754 self.assertRaises(MaybeEncodingError, res.get)
1755 wrapped = scratchpad[0]
1756 self.assertTrue(wrapped)
1757 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1758 self.assertIsNotNone(wrapped.exc)
1759 self.assertIsNotNone(wrapped.value)
1760
1761 p.close()
1762 p.join()
1763
1764class _TestPoolWorkerLifetime(BaseTestCase):
1765 ALLOWED_TYPES = ('processes', )
1766
Jesse Noller1f0b6582010-01-27 03:36:01 +00001767 def test_pool_worker_lifetime(self):
1768 p = multiprocessing.Pool(3, maxtasksperchild=10)
1769 self.assertEqual(3, len(p._pool))
1770 origworkerpids = [w.pid for w in p._pool]
1771 # Run many tasks so each worker gets replaced (hopefully)
1772 results = []
1773 for i in range(100):
1774 results.append(p.apply_async(sqr, (i, )))
1775 # Fetch the results and verify we got the right answers,
1776 # also ensuring all the tasks have completed.
1777 for (j, res) in enumerate(results):
1778 self.assertEqual(res.get(), sqr(j))
1779 # Refill the pool
1780 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001781 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001782 # (countdown * DELTA = 5 seconds max startup process time)
1783 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001784 while countdown and not all(w.is_alive() for w in p._pool):
1785 countdown -= 1
1786 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001787 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001788 # All pids should be assigned. See issue #7805.
1789 self.assertNotIn(None, origworkerpids)
1790 self.assertNotIn(None, finalworkerpids)
1791 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001792 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1793 p.close()
1794 p.join()
1795
Charles-François Natalif8859e12011-10-24 18:45:29 +02001796 def test_pool_worker_lifetime_early_close(self):
1797 # Issue #10332: closing a pool whose workers have limited lifetimes
1798 # before all the tasks completed would make join() hang.
1799 p = multiprocessing.Pool(3, maxtasksperchild=1)
1800 results = []
1801 for i in range(6):
1802 results.append(p.apply_async(sqr, (i, 0.3)))
1803 p.close()
1804 p.join()
1805 # check the results
1806 for (j, res) in enumerate(results):
1807 self.assertEqual(res.get(), sqr(j))
1808
1809
Benjamin Petersone711caf2008-06-11 16:44:04 +00001810#
1811# Test that manager has expected number of shared objects left
1812#
1813
1814class _TestZZZNumberOfObjects(BaseTestCase):
1815 # Because test cases are sorted alphabetically, this one will get
1816 # run after all the other tests for the manager. It tests that
1817 # there have been no "reference leaks" for the manager's shared
1818 # objects. Note the comment in _TestPool.test_terminate().
1819 ALLOWED_TYPES = ('manager',)
1820
1821 def test_number_of_objects(self):
1822 EXPECTED_NUMBER = 1 # the pool object is still alive
1823 multiprocessing.active_children() # discard dead process objs
1824 gc.collect() # do garbage collection
1825 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001826 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001827 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001828 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001829 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001830
1831 self.assertEqual(refs, EXPECTED_NUMBER)
1832
1833#
1834# Test of creating a customized manager class
1835#
1836
1837from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1838
1839class FooBar(object):
1840 def f(self):
1841 return 'f()'
1842 def g(self):
1843 raise ValueError
1844 def _h(self):
1845 return '_h()'
1846
1847def baz():
1848 for i in range(10):
1849 yield i*i
1850
1851class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001852 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001853 def __iter__(self):
1854 return self
1855 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001856 return self._callmethod('__next__')
1857
1858class MyManager(BaseManager):
1859 pass
1860
1861MyManager.register('Foo', callable=FooBar)
1862MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1863MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1864
1865
1866class _TestMyManager(BaseTestCase):
1867
1868 ALLOWED_TYPES = ('manager',)
1869
1870 def test_mymanager(self):
1871 manager = MyManager()
1872 manager.start()
1873
1874 foo = manager.Foo()
1875 bar = manager.Bar()
1876 baz = manager.baz()
1877
1878 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1879 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1880
1881 self.assertEqual(foo_methods, ['f', 'g'])
1882 self.assertEqual(bar_methods, ['f', '_h'])
1883
1884 self.assertEqual(foo.f(), 'f()')
1885 self.assertRaises(ValueError, foo.g)
1886 self.assertEqual(foo._callmethod('f'), 'f()')
1887 self.assertRaises(RemoteError, foo._callmethod, '_h')
1888
1889 self.assertEqual(bar.f(), 'f()')
1890 self.assertEqual(bar._h(), '_h()')
1891 self.assertEqual(bar._callmethod('f'), 'f()')
1892 self.assertEqual(bar._callmethod('_h'), '_h()')
1893
1894 self.assertEqual(list(baz), [i*i for i in range(10)])
1895
1896 manager.shutdown()
1897
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001898 # If the manager process exited cleanly then the exitcode
1899 # will be zero. Otherwise (after a short timeout)
1900 # terminate() is used, resulting in an exitcode of -SIGTERM.
1901 self.assertEqual(manager._process.exitcode, 0)
1902
Benjamin Petersone711caf2008-06-11 16:44:04 +00001903#
1904# Test of connecting to a remote server and using xmlrpclib for serialization
1905#
1906
1907_queue = pyqueue.Queue()
1908def get_queue():
1909 return _queue
1910
1911class QueueManager(BaseManager):
1912 '''manager class used by server process'''
1913QueueManager.register('get_queue', callable=get_queue)
1914
1915class QueueManager2(BaseManager):
1916 '''manager class which specifies the same interface as QueueManager'''
1917QueueManager2.register('get_queue')
1918
1919
1920SERIALIZER = 'xmlrpclib'
1921
1922class _TestRemoteManager(BaseTestCase):
1923
1924 ALLOWED_TYPES = ('manager',)
1925
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001926 @classmethod
1927 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001928 manager = QueueManager2(
1929 address=address, authkey=authkey, serializer=SERIALIZER
1930 )
1931 manager.connect()
1932 queue = manager.get_queue()
1933 queue.put(('hello world', None, True, 2.25))
1934
1935 def test_remote(self):
1936 authkey = os.urandom(32)
1937
1938 manager = QueueManager(
1939 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1940 )
1941 manager.start()
1942
1943 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001944 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001945 p.start()
1946
1947 manager2 = QueueManager2(
1948 address=manager.address, authkey=authkey, serializer=SERIALIZER
1949 )
1950 manager2.connect()
1951 queue = manager2.get_queue()
1952
1953 # Note that xmlrpclib will deserialize object as a list not a tuple
1954 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1955
1956 # Because we are using xmlrpclib for serialization instead of
1957 # pickle this will cause a serialization error.
1958 self.assertRaises(Exception, queue.put, time.sleep)
1959
1960 # Make queue finalizer run before the server is stopped
1961 del queue
1962 manager.shutdown()
1963
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001964class _TestManagerRestart(BaseTestCase):
1965
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001966 @classmethod
1967 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001968 manager = QueueManager(
1969 address=address, authkey=authkey, serializer=SERIALIZER)
1970 manager.connect()
1971 queue = manager.get_queue()
1972 queue.put('hello world')
1973
1974 def test_rapid_restart(self):
1975 authkey = os.urandom(32)
1976 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001977 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001978 srvr = manager.get_server()
1979 addr = srvr.address
1980 # Close the connection.Listener socket which gets opened as a part
1981 # of manager.get_server(). It's not needed for the test.
1982 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001983 manager.start()
1984
1985 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001986 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001987 p.start()
1988 queue = manager.get_queue()
1989 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001990 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001991 manager.shutdown()
1992 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001993 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001994 try:
1995 manager.start()
1996 except IOError as e:
1997 if e.errno != errno.EADDRINUSE:
1998 raise
1999 # Retry after some time, in case the old socket was lingering
2000 # (sporadic failure on buildbots)
2001 time.sleep(1.0)
2002 manager = QueueManager(
2003 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002004 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002005
Benjamin Petersone711caf2008-06-11 16:44:04 +00002006#
2007#
2008#
2009
2010SENTINEL = latin('')
2011
2012class _TestConnection(BaseTestCase):
2013
2014 ALLOWED_TYPES = ('processes', 'threads')
2015
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002016 @classmethod
2017 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002018 for msg in iter(conn.recv_bytes, SENTINEL):
2019 conn.send_bytes(msg)
2020 conn.close()
2021
2022 def test_connection(self):
2023 conn, child_conn = self.Pipe()
2024
2025 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002026 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002027 p.start()
2028
2029 seq = [1, 2.25, None]
2030 msg = latin('hello world')
2031 longmsg = msg * 10
2032 arr = array.array('i', list(range(4)))
2033
2034 if self.TYPE == 'processes':
2035 self.assertEqual(type(conn.fileno()), int)
2036
2037 self.assertEqual(conn.send(seq), None)
2038 self.assertEqual(conn.recv(), seq)
2039
2040 self.assertEqual(conn.send_bytes(msg), None)
2041 self.assertEqual(conn.recv_bytes(), msg)
2042
2043 if self.TYPE == 'processes':
2044 buffer = array.array('i', [0]*10)
2045 expected = list(arr) + [0] * (10 - len(arr))
2046 self.assertEqual(conn.send_bytes(arr), None)
2047 self.assertEqual(conn.recv_bytes_into(buffer),
2048 len(arr) * buffer.itemsize)
2049 self.assertEqual(list(buffer), expected)
2050
2051 buffer = array.array('i', [0]*10)
2052 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2053 self.assertEqual(conn.send_bytes(arr), None)
2054 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2055 len(arr) * buffer.itemsize)
2056 self.assertEqual(list(buffer), expected)
2057
2058 buffer = bytearray(latin(' ' * 40))
2059 self.assertEqual(conn.send_bytes(longmsg), None)
2060 try:
2061 res = conn.recv_bytes_into(buffer)
2062 except multiprocessing.BufferTooShort as e:
2063 self.assertEqual(e.args, (longmsg,))
2064 else:
2065 self.fail('expected BufferTooShort, got %s' % res)
2066
2067 poll = TimingWrapper(conn.poll)
2068
2069 self.assertEqual(poll(), False)
2070 self.assertTimingAlmostEqual(poll.elapsed, 0)
2071
Richard Oudkerk59d54042012-05-10 16:11:12 +01002072 self.assertEqual(poll(-1), False)
2073 self.assertTimingAlmostEqual(poll.elapsed, 0)
2074
Benjamin Petersone711caf2008-06-11 16:44:04 +00002075 self.assertEqual(poll(TIMEOUT1), False)
2076 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2077
2078 conn.send(None)
2079
2080 self.assertEqual(poll(TIMEOUT1), True)
2081 self.assertTimingAlmostEqual(poll.elapsed, 0)
2082
2083 self.assertEqual(conn.recv(), None)
2084
2085 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2086 conn.send_bytes(really_big_msg)
2087 self.assertEqual(conn.recv_bytes(), really_big_msg)
2088
2089 conn.send_bytes(SENTINEL) # tell child to quit
2090 child_conn.close()
2091
2092 if self.TYPE == 'processes':
2093 self.assertEqual(conn.readable, True)
2094 self.assertEqual(conn.writable, True)
2095 self.assertRaises(EOFError, conn.recv)
2096 self.assertRaises(EOFError, conn.recv_bytes)
2097
2098 p.join()
2099
2100 def test_duplex_false(self):
2101 reader, writer = self.Pipe(duplex=False)
2102 self.assertEqual(writer.send(1), None)
2103 self.assertEqual(reader.recv(), 1)
2104 if self.TYPE == 'processes':
2105 self.assertEqual(reader.readable, True)
2106 self.assertEqual(reader.writable, False)
2107 self.assertEqual(writer.readable, False)
2108 self.assertEqual(writer.writable, True)
2109 self.assertRaises(IOError, reader.send, 2)
2110 self.assertRaises(IOError, writer.recv)
2111 self.assertRaises(IOError, writer.poll)
2112
2113 def test_spawn_close(self):
2114 # We test that a pipe connection can be closed by parent
2115 # process immediately after child is spawned. On Windows this
2116 # would have sometimes failed on old versions because
2117 # child_conn would be closed before the child got a chance to
2118 # duplicate it.
2119 conn, child_conn = self.Pipe()
2120
2121 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002122 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002123 p.start()
2124 child_conn.close() # this might complete before child initializes
2125
2126 msg = latin('hello')
2127 conn.send_bytes(msg)
2128 self.assertEqual(conn.recv_bytes(), msg)
2129
2130 conn.send_bytes(SENTINEL)
2131 conn.close()
2132 p.join()
2133
2134 def test_sendbytes(self):
2135 if self.TYPE != 'processes':
2136 return
2137
2138 msg = latin('abcdefghijklmnopqrstuvwxyz')
2139 a, b = self.Pipe()
2140
2141 a.send_bytes(msg)
2142 self.assertEqual(b.recv_bytes(), msg)
2143
2144 a.send_bytes(msg, 5)
2145 self.assertEqual(b.recv_bytes(), msg[5:])
2146
2147 a.send_bytes(msg, 7, 8)
2148 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2149
2150 a.send_bytes(msg, 26)
2151 self.assertEqual(b.recv_bytes(), latin(''))
2152
2153 a.send_bytes(msg, 26, 0)
2154 self.assertEqual(b.recv_bytes(), latin(''))
2155
2156 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2157
2158 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2159
2160 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2161
2162 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2163
2164 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2165
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002166 @classmethod
2167 def _is_fd_assigned(cls, fd):
2168 try:
2169 os.fstat(fd)
2170 except OSError as e:
2171 if e.errno == errno.EBADF:
2172 return False
2173 raise
2174 else:
2175 return True
2176
2177 @classmethod
2178 def _writefd(cls, conn, data, create_dummy_fds=False):
2179 if create_dummy_fds:
2180 for i in range(0, 256):
2181 if not cls._is_fd_assigned(i):
2182 os.dup2(conn.fileno(), i)
2183 fd = reduction.recv_handle(conn)
2184 if msvcrt:
2185 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2186 os.write(fd, data)
2187 os.close(fd)
2188
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002189 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002190 def test_fd_transfer(self):
2191 if self.TYPE != 'processes':
2192 self.skipTest("only makes sense with processes")
2193 conn, child_conn = self.Pipe(duplex=True)
2194
2195 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002196 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002197 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002198 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002199 with open(test.support.TESTFN, "wb") as f:
2200 fd = f.fileno()
2201 if msvcrt:
2202 fd = msvcrt.get_osfhandle(fd)
2203 reduction.send_handle(conn, fd, p.pid)
2204 p.join()
2205 with open(test.support.TESTFN, "rb") as f:
2206 self.assertEqual(f.read(), b"foo")
2207
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002208 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002209 @unittest.skipIf(sys.platform == "win32",
2210 "test semantics don't make sense on Windows")
2211 @unittest.skipIf(MAXFD <= 256,
2212 "largest assignable fd number is too small")
2213 @unittest.skipUnless(hasattr(os, "dup2"),
2214 "test needs os.dup2()")
2215 def test_large_fd_transfer(self):
2216 # With fd > 256 (issue #11657)
2217 if self.TYPE != 'processes':
2218 self.skipTest("only makes sense with processes")
2219 conn, child_conn = self.Pipe(duplex=True)
2220
2221 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002222 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002223 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002224 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002225 with open(test.support.TESTFN, "wb") as f:
2226 fd = f.fileno()
2227 for newfd in range(256, MAXFD):
2228 if not self._is_fd_assigned(newfd):
2229 break
2230 else:
2231 self.fail("could not find an unassigned large file descriptor")
2232 os.dup2(fd, newfd)
2233 try:
2234 reduction.send_handle(conn, newfd, p.pid)
2235 finally:
2236 os.close(newfd)
2237 p.join()
2238 with open(test.support.TESTFN, "rb") as f:
2239 self.assertEqual(f.read(), b"bar")
2240
Jesus Cea4507e642011-09-21 03:53:25 +02002241 @classmethod
2242 def _send_data_without_fd(self, conn):
2243 os.write(conn.fileno(), b"\0")
2244
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002245 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002246 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2247 def test_missing_fd_transfer(self):
2248 # Check that exception is raised when received data is not
2249 # accompanied by a file descriptor in ancillary data.
2250 if self.TYPE != 'processes':
2251 self.skipTest("only makes sense with processes")
2252 conn, child_conn = self.Pipe(duplex=True)
2253
2254 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2255 p.daemon = True
2256 p.start()
2257 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2258 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002259
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002260class _TestListener(BaseTestCase):
2261
2262 ALLOWED_TYPES = ('processes')
2263
2264 def test_multiple_bind(self):
2265 for family in self.connection.families:
2266 l = self.connection.Listener(family=family)
2267 self.addCleanup(l.close)
2268 self.assertRaises(OSError, self.connection.Listener,
2269 l.address, family)
2270
Benjamin Petersone711caf2008-06-11 16:44:04 +00002271class _TestListenerClient(BaseTestCase):
2272
2273 ALLOWED_TYPES = ('processes', 'threads')
2274
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002275 @classmethod
2276 def _test(cls, address):
2277 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002278 conn.send('hello')
2279 conn.close()
2280
2281 def test_listener_client(self):
2282 for family in self.connection.families:
2283 l = self.connection.Listener(family=family)
2284 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002285 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002286 p.start()
2287 conn = l.accept()
2288 self.assertEqual(conn.recv(), 'hello')
2289 p.join()
2290 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002291
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002292 def test_issue14725(self):
2293 l = self.connection.Listener()
2294 p = self.Process(target=self._test, args=(l.address,))
2295 p.daemon = True
2296 p.start()
2297 time.sleep(1)
2298 # On Windows the client process should by now have connected,
2299 # written data and closed the pipe handle by now. This causes
2300 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2301 # 14725.
2302 conn = l.accept()
2303 self.assertEqual(conn.recv(), 'hello')
2304 conn.close()
2305 p.join()
2306 l.close()
2307
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002308class _TestPoll(unittest.TestCase):
2309
2310 ALLOWED_TYPES = ('processes', 'threads')
2311
2312 def test_empty_string(self):
2313 a, b = self.Pipe()
2314 self.assertEqual(a.poll(), False)
2315 b.send_bytes(b'')
2316 self.assertEqual(a.poll(), True)
2317 self.assertEqual(a.poll(), True)
2318
2319 @classmethod
2320 def _child_strings(cls, conn, strings):
2321 for s in strings:
2322 time.sleep(0.1)
2323 conn.send_bytes(s)
2324 conn.close()
2325
2326 def test_strings(self):
2327 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2328 a, b = self.Pipe()
2329 p = self.Process(target=self._child_strings, args=(b, strings))
2330 p.start()
2331
2332 for s in strings:
2333 for i in range(200):
2334 if a.poll(0.01):
2335 break
2336 x = a.recv_bytes()
2337 self.assertEqual(s, x)
2338
2339 p.join()
2340
2341 @classmethod
2342 def _child_boundaries(cls, r):
2343 # Polling may "pull" a message in to the child process, but we
2344 # don't want it to pull only part of a message, as that would
2345 # corrupt the pipe for any other processes which might later
2346 # read from it.
2347 r.poll(5)
2348
2349 def test_boundaries(self):
2350 r, w = self.Pipe(False)
2351 p = self.Process(target=self._child_boundaries, args=(r,))
2352 p.start()
2353 time.sleep(2)
2354 L = [b"first", b"second"]
2355 for obj in L:
2356 w.send_bytes(obj)
2357 w.close()
2358 p.join()
2359 self.assertIn(r.recv_bytes(), L)
2360
2361 @classmethod
2362 def _child_dont_merge(cls, b):
2363 b.send_bytes(b'a')
2364 b.send_bytes(b'b')
2365 b.send_bytes(b'cd')
2366
2367 def test_dont_merge(self):
2368 a, b = self.Pipe()
2369 self.assertEqual(a.poll(0.0), False)
2370 self.assertEqual(a.poll(0.1), False)
2371
2372 p = self.Process(target=self._child_dont_merge, args=(b,))
2373 p.start()
2374
2375 self.assertEqual(a.recv_bytes(), b'a')
2376 self.assertEqual(a.poll(1.0), True)
2377 self.assertEqual(a.poll(1.0), True)
2378 self.assertEqual(a.recv_bytes(), b'b')
2379 self.assertEqual(a.poll(1.0), True)
2380 self.assertEqual(a.poll(1.0), True)
2381 self.assertEqual(a.poll(0.0), True)
2382 self.assertEqual(a.recv_bytes(), b'cd')
2383
2384 p.join()
2385
Benjamin Petersone711caf2008-06-11 16:44:04 +00002386#
2387# Test of sending connection and socket objects between processes
2388#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002389
Richard Oudkerk24524192012-04-30 14:48:51 +01002390# Intermittent fails on Mac OS X -- see Issue14669 and Issue12958
2391@unittest.skipIf(sys.platform == "darwin", "fd passing unreliable on Mac OS X")
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002392@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002393class _TestPicklingConnections(BaseTestCase):
2394
2395 ALLOWED_TYPES = ('processes',)
2396
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002397 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002398 def tearDownClass(cls):
2399 from multiprocessing.reduction import resource_sharer
2400 resource_sharer.stop(timeout=5)
2401
2402 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002403 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002404 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002405 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002406 conn.send(l.address)
2407 new_conn = l.accept()
2408 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002409 new_conn.close()
2410 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002411
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002412 l = socket.socket()
2413 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002414 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002415 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002416 new_conn, addr = l.accept()
2417 conn.send(new_conn)
2418 new_conn.close()
2419 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002420
2421 conn.recv()
2422
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002423 @classmethod
2424 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002425 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002426 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002427 client.send(msg.upper())
2428 client.close()
2429
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002430 address, msg = conn.recv()
2431 client = socket.socket()
2432 client.connect(address)
2433 client.sendall(msg.upper())
2434 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002435
2436 conn.close()
2437
2438 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002439 families = self.connection.families
2440
2441 lconn, lconn0 = self.Pipe()
2442 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002443 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002444 lp.start()
2445 lconn0.close()
2446
2447 rconn, rconn0 = self.Pipe()
2448 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002449 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002450 rp.start()
2451 rconn0.close()
2452
2453 for fam in families:
2454 msg = ('This connection uses family %s' % fam).encode('ascii')
2455 address = lconn.recv()
2456 rconn.send((address, msg))
2457 new_conn = lconn.recv()
2458 self.assertEqual(new_conn.recv(), msg.upper())
2459
2460 rconn.send(None)
2461
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002462 msg = latin('This connection uses a normal socket')
2463 address = lconn.recv()
2464 rconn.send((address, msg))
2465 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002466 buf = []
2467 while True:
2468 s = new_conn.recv(100)
2469 if not s:
2470 break
2471 buf.append(s)
2472 buf = b''.join(buf)
2473 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002474 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002475
2476 lconn.send(None)
2477
2478 rconn.close()
2479 lconn.close()
2480
2481 lp.join()
2482 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002483
2484 @classmethod
2485 def child_access(cls, conn):
2486 w = conn.recv()
2487 w.send('all is well')
2488 w.close()
2489
2490 r = conn.recv()
2491 msg = r.recv()
2492 conn.send(msg*2)
2493
2494 conn.close()
2495
2496 def test_access(self):
2497 # On Windows, if we do not specify a destination pid when
2498 # using DupHandle then we need to be careful to use the
2499 # correct access flags for DuplicateHandle(), or else
2500 # DupHandle.detach() will raise PermissionError. For example,
2501 # for a read only pipe handle we should use
2502 # access=FILE_GENERIC_READ. (Unfortunately
2503 # DUPLICATE_SAME_ACCESS does not work.)
2504 conn, child_conn = self.Pipe()
2505 p = self.Process(target=self.child_access, args=(child_conn,))
2506 p.daemon = True
2507 p.start()
2508 child_conn.close()
2509
2510 r, w = self.Pipe(duplex=False)
2511 conn.send(w)
2512 w.close()
2513 self.assertEqual(r.recv(), 'all is well')
2514 r.close()
2515
2516 r, w = self.Pipe(duplex=False)
2517 conn.send(r)
2518 r.close()
2519 w.send('foobar')
2520 w.close()
2521 self.assertEqual(conn.recv(), 'foobar'*2)
2522
Benjamin Petersone711caf2008-06-11 16:44:04 +00002523#
2524#
2525#
2526
2527class _TestHeap(BaseTestCase):
2528
2529 ALLOWED_TYPES = ('processes',)
2530
2531 def test_heap(self):
2532 iterations = 5000
2533 maxblocks = 50
2534 blocks = []
2535
2536 # create and destroy lots of blocks of different sizes
2537 for i in range(iterations):
2538 size = int(random.lognormvariate(0, 1) * 1000)
2539 b = multiprocessing.heap.BufferWrapper(size)
2540 blocks.append(b)
2541 if len(blocks) > maxblocks:
2542 i = random.randrange(maxblocks)
2543 del blocks[i]
2544
2545 # get the heap object
2546 heap = multiprocessing.heap.BufferWrapper._heap
2547
2548 # verify the state of the heap
2549 all = []
2550 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002551 heap._lock.acquire()
2552 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002553 for L in list(heap._len_to_seq.values()):
2554 for arena, start, stop in L:
2555 all.append((heap._arenas.index(arena), start, stop,
2556 stop-start, 'free'))
2557 for arena, start, stop in heap._allocated_blocks:
2558 all.append((heap._arenas.index(arena), start, stop,
2559 stop-start, 'occupied'))
2560 occupied += (stop-start)
2561
2562 all.sort()
2563
2564 for i in range(len(all)-1):
2565 (arena, start, stop) = all[i][:3]
2566 (narena, nstart, nstop) = all[i+1][:3]
2567 self.assertTrue((arena != narena and nstart == 0) or
2568 (stop == nstart))
2569
Charles-François Natali778db492011-07-02 14:35:49 +02002570 def test_free_from_gc(self):
2571 # Check that freeing of blocks by the garbage collector doesn't deadlock
2572 # (issue #12352).
2573 # Make sure the GC is enabled, and set lower collection thresholds to
2574 # make collections more frequent (and increase the probability of
2575 # deadlock).
2576 if not gc.isenabled():
2577 gc.enable()
2578 self.addCleanup(gc.disable)
2579 thresholds = gc.get_threshold()
2580 self.addCleanup(gc.set_threshold, *thresholds)
2581 gc.set_threshold(10)
2582
2583 # perform numerous block allocations, with cyclic references to make
2584 # sure objects are collected asynchronously by the gc
2585 for i in range(5000):
2586 a = multiprocessing.heap.BufferWrapper(1)
2587 b = multiprocessing.heap.BufferWrapper(1)
2588 # circular references
2589 a.buddy = b
2590 b.buddy = a
2591
Benjamin Petersone711caf2008-06-11 16:44:04 +00002592#
2593#
2594#
2595
Benjamin Petersone711caf2008-06-11 16:44:04 +00002596class _Foo(Structure):
2597 _fields_ = [
2598 ('x', c_int),
2599 ('y', c_double)
2600 ]
2601
2602class _TestSharedCTypes(BaseTestCase):
2603
2604 ALLOWED_TYPES = ('processes',)
2605
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002606 def setUp(self):
2607 if not HAS_SHAREDCTYPES:
2608 self.skipTest("requires multiprocessing.sharedctypes")
2609
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002610 @classmethod
2611 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002612 x.value *= 2
2613 y.value *= 2
2614 foo.x *= 2
2615 foo.y *= 2
2616 string.value *= 2
2617 for i in range(len(arr)):
2618 arr[i] *= 2
2619
2620 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002621 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002622 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002623 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002624 arr = self.Array('d', list(range(10)), lock=lock)
2625 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002626 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002627
2628 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002629 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002630 p.start()
2631 p.join()
2632
2633 self.assertEqual(x.value, 14)
2634 self.assertAlmostEqual(y.value, 2.0/3.0)
2635 self.assertEqual(foo.x, 6)
2636 self.assertAlmostEqual(foo.y, 4.0)
2637 for i in range(10):
2638 self.assertAlmostEqual(arr[i], i*2)
2639 self.assertEqual(string.value, latin('hellohello'))
2640
2641 def test_synchronize(self):
2642 self.test_sharedctypes(lock=True)
2643
2644 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002645 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002646 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002647 foo.x = 0
2648 foo.y = 0
2649 self.assertEqual(bar.x, 2)
2650 self.assertAlmostEqual(bar.y, 5.0)
2651
2652#
2653#
2654#
2655
2656class _TestFinalize(BaseTestCase):
2657
2658 ALLOWED_TYPES = ('processes',)
2659
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002660 @classmethod
2661 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002662 class Foo(object):
2663 pass
2664
2665 a = Foo()
2666 util.Finalize(a, conn.send, args=('a',))
2667 del a # triggers callback for a
2668
2669 b = Foo()
2670 close_b = util.Finalize(b, conn.send, args=('b',))
2671 close_b() # triggers callback for b
2672 close_b() # does nothing because callback has already been called
2673 del b # does nothing because callback has already been called
2674
2675 c = Foo()
2676 util.Finalize(c, conn.send, args=('c',))
2677
2678 d10 = Foo()
2679 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2680
2681 d01 = Foo()
2682 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2683 d02 = Foo()
2684 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2685 d03 = Foo()
2686 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2687
2688 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2689
2690 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2691
Ezio Melotti13925002011-03-16 11:05:33 +02002692 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002693 # garbage collecting locals
2694 util._exit_function()
2695 conn.close()
2696 os._exit(0)
2697
2698 def test_finalize(self):
2699 conn, child_conn = self.Pipe()
2700
2701 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002702 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002703 p.start()
2704 p.join()
2705
2706 result = [obj for obj in iter(conn.recv, 'STOP')]
2707 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2708
2709#
2710# Test that from ... import * works for each module
2711#
2712
2713class _TestImportStar(BaseTestCase):
2714
2715 ALLOWED_TYPES = ('processes',)
2716
2717 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002718 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002719 'multiprocessing', 'multiprocessing.connection',
2720 'multiprocessing.heap', 'multiprocessing.managers',
2721 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002722 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002723 ]
2724
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002725 if HAS_REDUCTION:
2726 modules.append('multiprocessing.reduction')
2727
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002728 if c_int is not None:
2729 # This module requires _ctypes
2730 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002731
2732 for name in modules:
2733 __import__(name)
2734 mod = sys.modules[name]
2735
2736 for attr in getattr(mod, '__all__', ()):
2737 self.assertTrue(
2738 hasattr(mod, attr),
2739 '%r does not have attribute %r' % (mod, attr)
2740 )
2741
2742#
2743# Quick test that logging works -- does not test logging output
2744#
2745
2746class _TestLogging(BaseTestCase):
2747
2748 ALLOWED_TYPES = ('processes',)
2749
2750 def test_enable_logging(self):
2751 logger = multiprocessing.get_logger()
2752 logger.setLevel(util.SUBWARNING)
2753 self.assertTrue(logger is not None)
2754 logger.debug('this will not be printed')
2755 logger.info('nor will this')
2756 logger.setLevel(LOG_LEVEL)
2757
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002758 @classmethod
2759 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002760 logger = multiprocessing.get_logger()
2761 conn.send(logger.getEffectiveLevel())
2762
2763 def test_level(self):
2764 LEVEL1 = 32
2765 LEVEL2 = 37
2766
2767 logger = multiprocessing.get_logger()
2768 root_logger = logging.getLogger()
2769 root_level = root_logger.level
2770
2771 reader, writer = multiprocessing.Pipe(duplex=False)
2772
2773 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002774 p = self.Process(target=self._test_level, args=(writer,))
2775 p.daemon = True
2776 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002777 self.assertEqual(LEVEL1, reader.recv())
2778
2779 logger.setLevel(logging.NOTSET)
2780 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002781 p = self.Process(target=self._test_level, args=(writer,))
2782 p.daemon = True
2783 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002784 self.assertEqual(LEVEL2, reader.recv())
2785
2786 root_logger.setLevel(root_level)
2787 logger.setLevel(level=LOG_LEVEL)
2788
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002789
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002790# class _TestLoggingProcessName(BaseTestCase):
2791#
2792# def handle(self, record):
2793# assert record.processName == multiprocessing.current_process().name
2794# self.__handled = True
2795#
2796# def test_logging(self):
2797# handler = logging.Handler()
2798# handler.handle = self.handle
2799# self.__handled = False
2800# # Bypass getLogger() and side-effects
2801# logger = logging.getLoggerClass()(
2802# 'multiprocessing.test.TestLoggingProcessName')
2803# logger.addHandler(handler)
2804# logger.propagate = False
2805#
2806# logger.warn('foo')
2807# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002808
Benjamin Petersone711caf2008-06-11 16:44:04 +00002809#
Jesse Noller6214edd2009-01-19 16:23:53 +00002810# Test to verify handle verification, see issue 3321
2811#
2812
2813class TestInvalidHandle(unittest.TestCase):
2814
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002815 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002816 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002817 conn = multiprocessing.connection.Connection(44977608)
2818 try:
2819 self.assertRaises((ValueError, IOError), conn.poll)
2820 finally:
2821 # Hack private attribute _handle to avoid printing an error
2822 # in conn.__del__
2823 conn._handle = None
2824 self.assertRaises((ValueError, IOError),
2825 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002826
Jesse Noller6214edd2009-01-19 16:23:53 +00002827#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002828# Functions used to create test cases from the base ones in this module
2829#
2830
2831def get_attributes(Source, names):
2832 d = {}
2833 for name in names:
2834 obj = getattr(Source, name)
2835 if type(obj) == type(get_attributes):
2836 obj = staticmethod(obj)
2837 d[name] = obj
2838 return d
2839
2840def create_test_cases(Mixin, type):
2841 result = {}
2842 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002843 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002844
2845 for name in list(glob.keys()):
2846 if name.startswith('_Test'):
2847 base = glob[name]
2848 if type in base.ALLOWED_TYPES:
2849 newname = 'With' + Type + name[1:]
2850 class Temp(base, unittest.TestCase, Mixin):
2851 pass
2852 result[newname] = Temp
2853 Temp.__name__ = newname
2854 Temp.__module__ = Mixin.__module__
2855 return result
2856
2857#
2858# Create test cases
2859#
2860
2861class ProcessesMixin(object):
2862 TYPE = 'processes'
2863 Process = multiprocessing.Process
2864 locals().update(get_attributes(multiprocessing, (
2865 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01002866 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002867 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002868 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002869 )))
2870
2871testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2872globals().update(testcases_processes)
2873
2874
2875class ManagerMixin(object):
2876 TYPE = 'manager'
2877 Process = multiprocessing.Process
2878 manager = object.__new__(multiprocessing.managers.SyncManager)
2879 locals().update(get_attributes(manager, (
2880 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01002881 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002882 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002883 )))
2884
2885testcases_manager = create_test_cases(ManagerMixin, type='manager')
2886globals().update(testcases_manager)
2887
2888
2889class ThreadsMixin(object):
2890 TYPE = 'threads'
2891 Process = multiprocessing.dummy.Process
2892 locals().update(get_attributes(multiprocessing.dummy, (
2893 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01002894 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002895 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002896 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002897 )))
2898
2899testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2900globals().update(testcases_threads)
2901
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002902class OtherTest(unittest.TestCase):
2903 # TODO: add more tests for deliver/answer challenge.
2904 def test_deliver_challenge_auth_failure(self):
2905 class _FakeConnection(object):
2906 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002907 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002908 def send_bytes(self, data):
2909 pass
2910 self.assertRaises(multiprocessing.AuthenticationError,
2911 multiprocessing.connection.deliver_challenge,
2912 _FakeConnection(), b'abc')
2913
2914 def test_answer_challenge_auth_failure(self):
2915 class _FakeConnection(object):
2916 def __init__(self):
2917 self.count = 0
2918 def recv_bytes(self, size):
2919 self.count += 1
2920 if self.count == 1:
2921 return multiprocessing.connection.CHALLENGE
2922 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002923 return b'something bogus'
2924 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002925 def send_bytes(self, data):
2926 pass
2927 self.assertRaises(multiprocessing.AuthenticationError,
2928 multiprocessing.connection.answer_challenge,
2929 _FakeConnection(), b'abc')
2930
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002931#
2932# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2933#
2934
2935def initializer(ns):
2936 ns.test += 1
2937
2938class TestInitializers(unittest.TestCase):
2939 def setUp(self):
2940 self.mgr = multiprocessing.Manager()
2941 self.ns = self.mgr.Namespace()
2942 self.ns.test = 0
2943
2944 def tearDown(self):
2945 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002946 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002947
2948 def test_manager_initializer(self):
2949 m = multiprocessing.managers.SyncManager()
2950 self.assertRaises(TypeError, m.start, 1)
2951 m.start(initializer, (self.ns,))
2952 self.assertEqual(self.ns.test, 1)
2953 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002954 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002955
2956 def test_pool_initializer(self):
2957 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2958 p = multiprocessing.Pool(1, initializer, (self.ns,))
2959 p.close()
2960 p.join()
2961 self.assertEqual(self.ns.test, 1)
2962
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002963#
2964# Issue 5155, 5313, 5331: Test process in processes
2965# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2966#
2967
2968def _ThisSubProcess(q):
2969 try:
2970 item = q.get(block=False)
2971 except pyqueue.Empty:
2972 pass
2973
2974def _TestProcess(q):
2975 queue = multiprocessing.Queue()
2976 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002977 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002978 subProc.start()
2979 subProc.join()
2980
2981def _afunc(x):
2982 return x*x
2983
2984def pool_in_process():
2985 pool = multiprocessing.Pool(processes=4)
2986 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002987 pool.close()
2988 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002989
2990class _file_like(object):
2991 def __init__(self, delegate):
2992 self._delegate = delegate
2993 self._pid = None
2994
2995 @property
2996 def cache(self):
2997 pid = os.getpid()
2998 # There are no race conditions since fork keeps only the running thread
2999 if pid != self._pid:
3000 self._pid = pid
3001 self._cache = []
3002 return self._cache
3003
3004 def write(self, data):
3005 self.cache.append(data)
3006
3007 def flush(self):
3008 self._delegate.write(''.join(self.cache))
3009 self._cache = []
3010
3011class TestStdinBadfiledescriptor(unittest.TestCase):
3012
3013 def test_queue_in_process(self):
3014 queue = multiprocessing.Queue()
3015 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3016 proc.start()
3017 proc.join()
3018
3019 def test_pool_in_process(self):
3020 p = multiprocessing.Process(target=pool_in_process)
3021 p.start()
3022 p.join()
3023
3024 def test_flushing(self):
3025 sio = io.StringIO()
3026 flike = _file_like(sio)
3027 flike.write('foo')
3028 proc = multiprocessing.Process(target=lambda: flike.flush())
3029 flike.flush()
3030 assert sio.getvalue() == 'foo'
3031
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003032
3033class TestWait(unittest.TestCase):
3034
3035 @classmethod
3036 def _child_test_wait(cls, w, slow):
3037 for i in range(10):
3038 if slow:
3039 time.sleep(random.random()*0.1)
3040 w.send((i, os.getpid()))
3041 w.close()
3042
3043 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003044 from multiprocessing.connection import wait
3045 readers = []
3046 procs = []
3047 messages = []
3048
3049 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003050 r, w = multiprocessing.Pipe(duplex=False)
3051 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003052 p.daemon = True
3053 p.start()
3054 w.close()
3055 readers.append(r)
3056 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003057 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003058
3059 while readers:
3060 for r in wait(readers):
3061 try:
3062 msg = r.recv()
3063 except EOFError:
3064 readers.remove(r)
3065 r.close()
3066 else:
3067 messages.append(msg)
3068
3069 messages.sort()
3070 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3071 self.assertEqual(messages, expected)
3072
3073 @classmethod
3074 def _child_test_wait_socket(cls, address, slow):
3075 s = socket.socket()
3076 s.connect(address)
3077 for i in range(10):
3078 if slow:
3079 time.sleep(random.random()*0.1)
3080 s.sendall(('%s\n' % i).encode('ascii'))
3081 s.close()
3082
3083 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003084 from multiprocessing.connection import wait
3085 l = socket.socket()
3086 l.bind(('', 0))
3087 l.listen(4)
3088 addr = ('localhost', l.getsockname()[1])
3089 readers = []
3090 procs = []
3091 dic = {}
3092
3093 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003094 p = multiprocessing.Process(target=self._child_test_wait_socket,
3095 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003096 p.daemon = True
3097 p.start()
3098 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003099 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003100
3101 for i in range(4):
3102 r, _ = l.accept()
3103 readers.append(r)
3104 dic[r] = []
3105 l.close()
3106
3107 while readers:
3108 for r in wait(readers):
3109 msg = r.recv(32)
3110 if not msg:
3111 readers.remove(r)
3112 r.close()
3113 else:
3114 dic[r].append(msg)
3115
3116 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3117 for v in dic.values():
3118 self.assertEqual(b''.join(v), expected)
3119
3120 def test_wait_slow(self):
3121 self.test_wait(True)
3122
3123 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003124 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003125
3126 def test_wait_timeout(self):
3127 from multiprocessing.connection import wait
3128
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003129 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003130 a, b = multiprocessing.Pipe()
3131
3132 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003133 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003134 delta = time.time() - start
3135
3136 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003137 self.assertLess(delta, expected * 2)
3138 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003139
3140 b.send(None)
3141
3142 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003143 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003144 delta = time.time() - start
3145
3146 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003147 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003148
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003149 @classmethod
3150 def signal_and_sleep(cls, sem, period):
3151 sem.release()
3152 time.sleep(period)
3153
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003154 def test_wait_integer(self):
3155 from multiprocessing.connection import wait
3156
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003157 expected = 3
3158 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003159 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003160 p = multiprocessing.Process(target=self.signal_and_sleep,
3161 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003162
3163 p.start()
3164 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003165 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003166
3167 start = time.time()
3168 res = wait([a, p.sentinel, b], expected + 20)
3169 delta = time.time() - start
3170
3171 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003172 self.assertLess(delta, expected + 2)
3173 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003174
3175 a.send(None)
3176
3177 start = time.time()
3178 res = wait([a, p.sentinel, b], 20)
3179 delta = time.time() - start
3180
3181 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01003182 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003183
3184 b.send(None)
3185
3186 start = time.time()
3187 res = wait([a, p.sentinel, b], 20)
3188 delta = time.time() - start
3189
3190 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01003191 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003192
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003193 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003194 p.join()
3195
Richard Oudkerk59d54042012-05-10 16:11:12 +01003196 def test_neg_timeout(self):
3197 from multiprocessing.connection import wait
3198 a, b = multiprocessing.Pipe()
3199 t = time.time()
3200 res = wait([a], timeout=-1)
3201 t = time.time() - t
3202 self.assertEqual(res, [])
3203 self.assertLess(t, 1)
3204 a.close()
3205 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003206
Antoine Pitrou709176f2012-04-01 17:19:09 +02003207#
3208# Issue 14151: Test invalid family on invalid environment
3209#
3210
3211class TestInvalidFamily(unittest.TestCase):
3212
3213 @unittest.skipIf(WIN32, "skipped on Windows")
3214 def test_invalid_family(self):
3215 with self.assertRaises(ValueError):
3216 multiprocessing.connection.Listener(r'\\.\test')
3217
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003218 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3219 def test_invalid_family_win32(self):
3220 with self.assertRaises(ValueError):
3221 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003222
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003223#
3224# Issue 12098: check sys.flags of child matches that for parent
3225#
3226
3227class TestFlags(unittest.TestCase):
3228 @classmethod
3229 def run_in_grandchild(cls, conn):
3230 conn.send(tuple(sys.flags))
3231
3232 @classmethod
3233 def run_in_child(cls):
3234 import json
3235 r, w = multiprocessing.Pipe(duplex=False)
3236 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3237 p.start()
3238 grandchild_flags = r.recv()
3239 p.join()
3240 r.close()
3241 w.close()
3242 flags = (tuple(sys.flags), grandchild_flags)
3243 print(json.dumps(flags))
3244
3245 def test_flags(self):
3246 import json, subprocess
3247 # start child process using unusual flags
3248 prog = ('from test.test_multiprocessing import TestFlags; ' +
3249 'TestFlags.run_in_child()')
3250 data = subprocess.check_output(
3251 [sys.executable, '-E', '-S', '-O', '-c', prog])
3252 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3253 self.assertEqual(child_flags, grandchild_flags)
3254
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003255testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003256 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
3257 TestFlags]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003258
Benjamin Petersone711caf2008-06-11 16:44:04 +00003259#
3260#
3261#
3262
3263def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003264 if sys.platform.startswith("linux"):
3265 try:
3266 lock = multiprocessing.RLock()
3267 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003268 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003269
Charles-François Natali221ef672011-11-22 18:55:22 +01003270 check_enough_semaphores()
3271
Benjamin Petersone711caf2008-06-11 16:44:04 +00003272 if run is None:
3273 from test.support import run_unittest as run
3274
3275 util.get_temp_dir() # creates temp directory for use by all processes
3276
3277 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3278
Benjamin Peterson41181742008-07-02 20:22:54 +00003279 ProcessesMixin.pool = multiprocessing.Pool(4)
3280 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
3281 ManagerMixin.manager.__init__()
3282 ManagerMixin.manager.start()
3283 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003284
3285 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003286 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3287 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003288 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3289 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003290 )
3291
3292 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3293 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003294 try:
3295 run(suite)
3296 finally:
3297 ThreadsMixin.pool.terminate()
3298 ProcessesMixin.pool.terminate()
3299 ManagerMixin.pool.terminate()
3300 ManagerMixin.pool.join()
3301 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003302 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003303 ThreadsMixin.pool.join()
3304 ProcessesMixin.pool.join()
3305 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00003306
3307def main():
3308 test_main(unittest.TextTestRunner(verbosity=2).run)
3309
3310if __name__ == '__main__':
3311 main()