blob: 2bcdb4e07c08d129f6dd26a17d6ef015e91b8776 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086if WIN32:
87 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
88
89 def wait_for_handle(handle, timeout):
90 if timeout is None or timeout < 0.0:
91 timeout = INFINITE
92 else:
93 timeout = int(1000 * timeout)
94 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
95else:
96 from select import select
97 _select = util._eintr_retry(select)
98
99 def wait_for_handle(handle, timeout):
100 if timeout is not None and timeout < 0.0:
101 timeout = None
102 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000103
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200104try:
105 MAXFD = os.sysconf("SC_OPEN_MAX")
106except:
107 MAXFD = 256
108
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000110# Some tests require ctypes
111#
112
113try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000114 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000115except ImportError:
116 Structure = object
117 c_int = c_double = None
118
Charles-François Natali221ef672011-11-22 18:55:22 +0100119
120def check_enough_semaphores():
121 """Check that the system supports enough semaphores to run the test."""
122 # minimum number of semaphores available according to POSIX
123 nsems_min = 256
124 try:
125 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
126 except (AttributeError, ValueError):
127 # sysconf not available or setting not available
128 return
129 if nsems == -1 or nsems >= nsems_min:
130 return
131 raise unittest.SkipTest("The OS doesn't support enough semaphores "
132 "to run the test (required: %d)." % nsems_min)
133
134
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000135#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136# Creates a wrapper for a function which records the time it takes to finish
137#
138
139class TimingWrapper(object):
140
141 def __init__(self, func):
142 self.func = func
143 self.elapsed = None
144
145 def __call__(self, *args, **kwds):
146 t = time.time()
147 try:
148 return self.func(*args, **kwds)
149 finally:
150 self.elapsed = time.time() - t
151
152#
153# Base class for test cases
154#
155
156class BaseTestCase(object):
157
158 ALLOWED_TYPES = ('processes', 'manager', 'threads')
159
160 def assertTimingAlmostEqual(self, a, b):
161 if CHECK_TIMINGS:
162 self.assertAlmostEqual(a, b, 1)
163
164 def assertReturnsIfImplemented(self, value, func, *args):
165 try:
166 res = func(*args)
167 except NotImplementedError:
168 pass
169 else:
170 return self.assertEqual(value, res)
171
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 # For the sanity of Windows users, rather than crashing or freezing in
173 # multiple ways.
174 def __reduce__(self, *args):
175 raise NotImplementedError("shouldn't try to pickle a test case")
176
177 __reduce_ex__ = __reduce__
178
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179#
180# Return the value of a semaphore
181#
182
183def get_value(self):
184 try:
185 return self.get_value()
186 except AttributeError:
187 try:
188 return self._Semaphore__value
189 except AttributeError:
190 try:
191 return self._value
192 except AttributeError:
193 raise NotImplementedError
194
195#
196# Testcases
197#
198
199class _TestProcess(BaseTestCase):
200
201 ALLOWED_TYPES = ('processes', 'threads')
202
203 def test_current(self):
204 if self.TYPE == 'threads':
205 return
206
207 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000212 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 self.assertEqual(current.ident, os.getpid())
215 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 def test_daemon_argument(self):
218 if self.TYPE == "threads":
219 return
220
221 # By default uses the current process's daemon flag.
222 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000223 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000224 proc1 = self.Process(target=self._test, daemon=True)
225 self.assertTrue(proc1.daemon)
226 proc2 = self.Process(target=self._test, daemon=False)
227 self.assertFalse(proc2.daemon)
228
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 @classmethod
230 def _test(cls, q, *args, **kwds):
231 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 q.put(args)
233 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000235 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000236 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 q.put(current.pid)
238
239 def test_process(self):
240 q = self.Queue(1)
241 e = self.Event()
242 args = (q, 1, 2)
243 kwargs = {'hello':23, 'bye':2.54}
244 name = 'SomeProcess'
245 p = self.Process(
246 target=self._test, args=args, kwargs=kwargs, name=name
247 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 current = self.current_process()
250
251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.authkey, current.authkey)
253 self.assertEqual(p.is_alive(), False)
254 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
260
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(p.exitcode, None)
262 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(q.get(), args[1:])
266 self.assertEqual(q.get(), kwargs)
267 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(q.get(), current.authkey)
270 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
272 p.join()
273
Ezio Melottib3aedd42010-11-20 19:04:17 +0000274 self.assertEqual(p.exitcode, 0)
275 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000276 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000278 @classmethod
279 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 time.sleep(1000)
281
282 def test_terminate(self):
283 if self.TYPE == 'threads':
284 return
285
286 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000287 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288 p.start()
289
290 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000291 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000292 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293
294 p.terminate()
295
296 join = TimingWrapper(p.join)
297 self.assertEqual(join(), None)
298 self.assertTimingAlmostEqual(join.elapsed, 0.0)
299
300 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
304
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000305 # XXX sometimes get p.exitcode == 0 on Windows ...
306 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307
308 def test_cpu_count(self):
309 try:
310 cpus = multiprocessing.cpu_count()
311 except NotImplementedError:
312 cpus = 1
313 self.assertTrue(type(cpus) is int)
314 self.assertTrue(cpus >= 1)
315
316 def test_active_children(self):
317 self.assertEqual(type(self.active_children()), list)
318
319 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000320 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
Jesus Cea94f964f2011-09-09 20:26:57 +0200322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000324 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000327 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000329 @classmethod
330 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 from multiprocessing import forking
332 wconn.send(id)
333 if len(id) < 2:
334 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000335 p = cls.Process(
336 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 )
338 p.start()
339 p.join()
340
341 def test_recursion(self):
342 rconn, wconn = self.Pipe(duplex=False)
343 self._test_recursion(wconn, [])
344
345 time.sleep(DELTA)
346 result = []
347 while rconn.poll():
348 result.append(rconn.recv())
349
350 expected = [
351 [],
352 [0],
353 [0, 0],
354 [0, 1],
355 [1],
356 [1, 0],
357 [1, 1]
358 ]
359 self.assertEqual(result, expected)
360
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200361 @classmethod
362 def _test_sentinel(cls, event):
363 event.wait(10.0)
364
365 def test_sentinel(self):
366 if self.TYPE == "threads":
367 return
368 event = self.Event()
369 p = self.Process(target=self._test_sentinel, args=(event,))
370 with self.assertRaises(ValueError):
371 p.sentinel
372 p.start()
373 self.addCleanup(p.join)
374 sentinel = p.sentinel
375 self.assertIsInstance(sentinel, int)
376 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
377 event.set()
378 p.join()
379 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
380
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381#
382#
383#
384
385class _UpperCaser(multiprocessing.Process):
386
387 def __init__(self):
388 multiprocessing.Process.__init__(self)
389 self.child_conn, self.parent_conn = multiprocessing.Pipe()
390
391 def run(self):
392 self.parent_conn.close()
393 for s in iter(self.child_conn.recv, None):
394 self.child_conn.send(s.upper())
395 self.child_conn.close()
396
397 def submit(self, s):
398 assert type(s) is str
399 self.parent_conn.send(s)
400 return self.parent_conn.recv()
401
402 def stop(self):
403 self.parent_conn.send(None)
404 self.parent_conn.close()
405 self.child_conn.close()
406
407class _TestSubclassingProcess(BaseTestCase):
408
409 ALLOWED_TYPES = ('processes',)
410
411 def test_subclassing(self):
412 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200413 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 uppercaser.start()
415 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
416 self.assertEqual(uppercaser.submit('world'), 'WORLD')
417 uppercaser.stop()
418 uppercaser.join()
419
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100420 def test_stderr_flush(self):
421 # sys.stderr is flushed at process shutdown (issue #13812)
422 if self.TYPE == "threads":
423 return
424
425 testfn = test.support.TESTFN
426 self.addCleanup(test.support.unlink, testfn)
427 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
428 proc.start()
429 proc.join()
430 with open(testfn, 'r') as f:
431 err = f.read()
432 # The whole traceback was printed
433 self.assertIn("ZeroDivisionError", err)
434 self.assertIn("test_multiprocessing.py", err)
435 self.assertIn("1/0 # MARKER", err)
436
437 @classmethod
438 def _test_stderr_flush(cls, testfn):
439 sys.stderr = open(testfn, 'w')
440 1/0 # MARKER
441
442
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443#
444#
445#
446
447def queue_empty(q):
448 if hasattr(q, 'empty'):
449 return q.empty()
450 else:
451 return q.qsize() == 0
452
453def queue_full(q, maxsize):
454 if hasattr(q, 'full'):
455 return q.full()
456 else:
457 return q.qsize() == maxsize
458
459
460class _TestQueue(BaseTestCase):
461
462
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000463 @classmethod
464 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000465 child_can_start.wait()
466 for i in range(6):
467 queue.get()
468 parent_can_continue.set()
469
470 def test_put(self):
471 MAXSIZE = 6
472 queue = self.Queue(maxsize=MAXSIZE)
473 child_can_start = self.Event()
474 parent_can_continue = self.Event()
475
476 proc = self.Process(
477 target=self._test_put,
478 args=(queue, child_can_start, parent_can_continue)
479 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000480 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000481 proc.start()
482
483 self.assertEqual(queue_empty(queue), True)
484 self.assertEqual(queue_full(queue, MAXSIZE), False)
485
486 queue.put(1)
487 queue.put(2, True)
488 queue.put(3, True, None)
489 queue.put(4, False)
490 queue.put(5, False, None)
491 queue.put_nowait(6)
492
493 # the values may be in buffer but not yet in pipe so sleep a bit
494 time.sleep(DELTA)
495
496 self.assertEqual(queue_empty(queue), False)
497 self.assertEqual(queue_full(queue, MAXSIZE), True)
498
499 put = TimingWrapper(queue.put)
500 put_nowait = TimingWrapper(queue.put_nowait)
501
502 self.assertRaises(pyqueue.Full, put, 7, False)
503 self.assertTimingAlmostEqual(put.elapsed, 0)
504
505 self.assertRaises(pyqueue.Full, put, 7, False, None)
506 self.assertTimingAlmostEqual(put.elapsed, 0)
507
508 self.assertRaises(pyqueue.Full, put_nowait, 7)
509 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
510
511 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
512 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
513
514 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
515 self.assertTimingAlmostEqual(put.elapsed, 0)
516
517 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
518 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
519
520 child_can_start.set()
521 parent_can_continue.wait()
522
523 self.assertEqual(queue_empty(queue), True)
524 self.assertEqual(queue_full(queue, MAXSIZE), False)
525
526 proc.join()
527
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000528 @classmethod
529 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000531 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 queue.put(2)
533 queue.put(3)
534 queue.put(4)
535 queue.put(5)
536 parent_can_continue.set()
537
538 def test_get(self):
539 queue = self.Queue()
540 child_can_start = self.Event()
541 parent_can_continue = self.Event()
542
543 proc = self.Process(
544 target=self._test_get,
545 args=(queue, child_can_start, parent_can_continue)
546 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000547 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000548 proc.start()
549
550 self.assertEqual(queue_empty(queue), True)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 time.sleep(DELTA)
556 self.assertEqual(queue_empty(queue), False)
557
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000558 # Hangs unexpectedly, remove for now
559 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 self.assertEqual(queue.get(True, None), 2)
561 self.assertEqual(queue.get(True), 3)
562 self.assertEqual(queue.get(timeout=1), 4)
563 self.assertEqual(queue.get_nowait(), 5)
564
565 self.assertEqual(queue_empty(queue), True)
566
567 get = TimingWrapper(queue.get)
568 get_nowait = TimingWrapper(queue.get_nowait)
569
570 self.assertRaises(pyqueue.Empty, get, False)
571 self.assertTimingAlmostEqual(get.elapsed, 0)
572
573 self.assertRaises(pyqueue.Empty, get, False, None)
574 self.assertTimingAlmostEqual(get.elapsed, 0)
575
576 self.assertRaises(pyqueue.Empty, get_nowait)
577 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
578
579 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
580 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
581
582 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
583 self.assertTimingAlmostEqual(get.elapsed, 0)
584
585 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
586 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
587
588 proc.join()
589
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000590 @classmethod
591 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 for i in range(10, 20):
593 queue.put(i)
594 # note that at this point the items may only be buffered, so the
595 # process cannot shutdown until the feeder thread has finished
596 # pushing items onto the pipe.
597
598 def test_fork(self):
599 # Old versions of Queue would fail to create a new feeder
600 # thread for a forked process if the original process had its
601 # own feeder thread. This test checks that this no longer
602 # happens.
603
604 queue = self.Queue()
605
606 # put items on queue so that main process starts a feeder thread
607 for i in range(10):
608 queue.put(i)
609
610 # wait to make sure thread starts before we fork a new process
611 time.sleep(DELTA)
612
613 # fork process
614 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200615 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000616 p.start()
617
618 # check that all expected items are in the queue
619 for i in range(20):
620 self.assertEqual(queue.get(), i)
621 self.assertRaises(pyqueue.Empty, queue.get, False)
622
623 p.join()
624
625 def test_qsize(self):
626 q = self.Queue()
627 try:
628 self.assertEqual(q.qsize(), 0)
629 except NotImplementedError:
630 return
631 q.put(1)
632 self.assertEqual(q.qsize(), 1)
633 q.put(5)
634 self.assertEqual(q.qsize(), 2)
635 q.get()
636 self.assertEqual(q.qsize(), 1)
637 q.get()
638 self.assertEqual(q.qsize(), 0)
639
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000640 @classmethod
641 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000642 for obj in iter(q.get, None):
643 time.sleep(DELTA)
644 q.task_done()
645
646 def test_task_done(self):
647 queue = self.JoinableQueue()
648
649 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000650 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651
652 workers = [self.Process(target=self._test_task_done, args=(queue,))
653 for i in range(4)]
654
655 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200656 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657 p.start()
658
659 for i in range(10):
660 queue.put(i)
661
662 queue.join()
663
664 for p in workers:
665 queue.put(None)
666
667 for p in workers:
668 p.join()
669
670#
671#
672#
673
674class _TestLock(BaseTestCase):
675
676 def test_lock(self):
677 lock = self.Lock()
678 self.assertEqual(lock.acquire(), True)
679 self.assertEqual(lock.acquire(False), False)
680 self.assertEqual(lock.release(), None)
681 self.assertRaises((ValueError, threading.ThreadError), lock.release)
682
683 def test_rlock(self):
684 lock = self.RLock()
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(), True)
687 self.assertEqual(lock.acquire(), True)
688 self.assertEqual(lock.release(), None)
689 self.assertEqual(lock.release(), None)
690 self.assertEqual(lock.release(), None)
691 self.assertRaises((AssertionError, RuntimeError), lock.release)
692
Jesse Nollerf8d00852009-03-31 03:25:07 +0000693 def test_lock_context(self):
694 with self.Lock():
695 pass
696
Benjamin Petersone711caf2008-06-11 16:44:04 +0000697
698class _TestSemaphore(BaseTestCase):
699
700 def _test_semaphore(self, sem):
701 self.assertReturnsIfImplemented(2, get_value, sem)
702 self.assertEqual(sem.acquire(), True)
703 self.assertReturnsIfImplemented(1, get_value, sem)
704 self.assertEqual(sem.acquire(), True)
705 self.assertReturnsIfImplemented(0, get_value, sem)
706 self.assertEqual(sem.acquire(False), False)
707 self.assertReturnsIfImplemented(0, get_value, sem)
708 self.assertEqual(sem.release(), None)
709 self.assertReturnsIfImplemented(1, get_value, sem)
710 self.assertEqual(sem.release(), None)
711 self.assertReturnsIfImplemented(2, get_value, sem)
712
713 def test_semaphore(self):
714 sem = self.Semaphore(2)
715 self._test_semaphore(sem)
716 self.assertEqual(sem.release(), None)
717 self.assertReturnsIfImplemented(3, get_value, sem)
718 self.assertEqual(sem.release(), None)
719 self.assertReturnsIfImplemented(4, get_value, sem)
720
721 def test_bounded_semaphore(self):
722 sem = self.BoundedSemaphore(2)
723 self._test_semaphore(sem)
724 # Currently fails on OS/X
725 #if HAVE_GETVALUE:
726 # self.assertRaises(ValueError, sem.release)
727 # self.assertReturnsIfImplemented(2, get_value, sem)
728
729 def test_timeout(self):
730 if self.TYPE != 'processes':
731 return
732
733 sem = self.Semaphore(0)
734 acquire = TimingWrapper(sem.acquire)
735
736 self.assertEqual(acquire(False), False)
737 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
738
739 self.assertEqual(acquire(False, None), False)
740 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
741
742 self.assertEqual(acquire(False, TIMEOUT1), False)
743 self.assertTimingAlmostEqual(acquire.elapsed, 0)
744
745 self.assertEqual(acquire(True, TIMEOUT2), False)
746 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
747
748 self.assertEqual(acquire(timeout=TIMEOUT3), False)
749 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
750
751
752class _TestCondition(BaseTestCase):
753
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000754 @classmethod
755 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 cond.acquire()
757 sleeping.release()
758 cond.wait(timeout)
759 woken.release()
760 cond.release()
761
762 def check_invariant(self, cond):
763 # this is only supposed to succeed when there are no sleepers
764 if self.TYPE == 'processes':
765 try:
766 sleepers = (cond._sleeping_count.get_value() -
767 cond._woken_count.get_value())
768 self.assertEqual(sleepers, 0)
769 self.assertEqual(cond._wait_semaphore.get_value(), 0)
770 except NotImplementedError:
771 pass
772
773 def test_notify(self):
774 cond = self.Condition()
775 sleeping = self.Semaphore(0)
776 woken = self.Semaphore(0)
777
778 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000779 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 p.start()
781
782 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000783 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 p.start()
785
786 # wait for both children to start sleeping
787 sleeping.acquire()
788 sleeping.acquire()
789
790 # check no process/thread has woken up
791 time.sleep(DELTA)
792 self.assertReturnsIfImplemented(0, get_value, woken)
793
794 # wake up one process/thread
795 cond.acquire()
796 cond.notify()
797 cond.release()
798
799 # check one process/thread has woken up
800 time.sleep(DELTA)
801 self.assertReturnsIfImplemented(1, get_value, woken)
802
803 # wake up another
804 cond.acquire()
805 cond.notify()
806 cond.release()
807
808 # check other has woken up
809 time.sleep(DELTA)
810 self.assertReturnsIfImplemented(2, get_value, woken)
811
812 # check state is not mucked up
813 self.check_invariant(cond)
814 p.join()
815
816 def test_notify_all(self):
817 cond = self.Condition()
818 sleeping = self.Semaphore(0)
819 woken = self.Semaphore(0)
820
821 # start some threads/processes which will timeout
822 for i in range(3):
823 p = self.Process(target=self.f,
824 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000825 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 p.start()
827
828 t = threading.Thread(target=self.f,
829 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000830 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 t.start()
832
833 # wait for them all to sleep
834 for i in range(6):
835 sleeping.acquire()
836
837 # check they have all timed out
838 for i in range(6):
839 woken.acquire()
840 self.assertReturnsIfImplemented(0, get_value, woken)
841
842 # check state is not mucked up
843 self.check_invariant(cond)
844
845 # start some more threads/processes
846 for i in range(3):
847 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000848 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 p.start()
850
851 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000852 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 t.start()
854
855 # wait for them to all sleep
856 for i in range(6):
857 sleeping.acquire()
858
859 # check no process/thread has woken up
860 time.sleep(DELTA)
861 self.assertReturnsIfImplemented(0, get_value, woken)
862
863 # wake them all up
864 cond.acquire()
865 cond.notify_all()
866 cond.release()
867
868 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200869 for i in range(10):
870 try:
871 if get_value(woken) == 6:
872 break
873 except NotImplementedError:
874 break
875 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 self.assertReturnsIfImplemented(6, get_value, woken)
877
878 # check state is not mucked up
879 self.check_invariant(cond)
880
881 def test_timeout(self):
882 cond = self.Condition()
883 wait = TimingWrapper(cond.wait)
884 cond.acquire()
885 res = wait(TIMEOUT1)
886 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000887 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
889
890
891class _TestEvent(BaseTestCase):
892
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000893 @classmethod
894 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 time.sleep(TIMEOUT2)
896 event.set()
897
898 def test_event(self):
899 event = self.Event()
900 wait = TimingWrapper(event.wait)
901
Ezio Melotti13925002011-03-16 11:05:33 +0200902 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000904 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905
Benjamin Peterson965ce872009-04-05 21:24:58 +0000906 # Removed, threading.Event.wait() will return the value of the __flag
907 # instead of None. API Shear with the semaphore backed mp.Event
908 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000909 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000910 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000911 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
912
913 event.set()
914
915 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000916 self.assertEqual(event.is_set(), True)
917 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000919 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
921 # self.assertEqual(event.is_set(), True)
922
923 event.clear()
924
925 #self.assertEqual(event.is_set(), False)
926
Jesus Cea94f964f2011-09-09 20:26:57 +0200927 p = self.Process(target=self._test_event, args=(event,))
928 p.daemon = True
929 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000930 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931
932#
933#
934#
935
936class _TestValue(BaseTestCase):
937
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000938 ALLOWED_TYPES = ('processes',)
939
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 codes_values = [
941 ('i', 4343, 24234),
942 ('d', 3.625, -4.25),
943 ('h', -232, 234),
944 ('c', latin('x'), latin('y'))
945 ]
946
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000947 def setUp(self):
948 if not HAS_SHAREDCTYPES:
949 self.skipTest("requires multiprocessing.sharedctypes")
950
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000951 @classmethod
952 def _test(cls, values):
953 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954 sv.value = cv[2]
955
956
957 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000958 if raw:
959 values = [self.RawValue(code, value)
960 for code, value, _ in self.codes_values]
961 else:
962 values = [self.Value(code, value)
963 for code, value, _ in self.codes_values]
964
965 for sv, cv in zip(values, self.codes_values):
966 self.assertEqual(sv.value, cv[1])
967
968 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200969 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 proc.start()
971 proc.join()
972
973 for sv, cv in zip(values, self.codes_values):
974 self.assertEqual(sv.value, cv[2])
975
976 def test_rawvalue(self):
977 self.test_value(raw=True)
978
979 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980 val1 = self.Value('i', 5)
981 lock1 = val1.get_lock()
982 obj1 = val1.get_obj()
983
984 val2 = self.Value('i', 5, lock=None)
985 lock2 = val2.get_lock()
986 obj2 = val2.get_obj()
987
988 lock = self.Lock()
989 val3 = self.Value('i', 5, lock=lock)
990 lock3 = val3.get_lock()
991 obj3 = val3.get_obj()
992 self.assertEqual(lock, lock3)
993
Jesse Nollerb0516a62009-01-18 03:11:38 +0000994 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 self.assertFalse(hasattr(arr4, 'get_lock'))
996 self.assertFalse(hasattr(arr4, 'get_obj'))
997
Jesse Nollerb0516a62009-01-18 03:11:38 +0000998 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
999
1000 arr5 = self.RawValue('i', 5)
1001 self.assertFalse(hasattr(arr5, 'get_lock'))
1002 self.assertFalse(hasattr(arr5, 'get_obj'))
1003
Benjamin Petersone711caf2008-06-11 16:44:04 +00001004
1005class _TestArray(BaseTestCase):
1006
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001007 ALLOWED_TYPES = ('processes',)
1008
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001009 @classmethod
1010 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011 for i in range(1, len(seq)):
1012 seq[i] += seq[i-1]
1013
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001016 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1017 if raw:
1018 arr = self.RawArray('i', seq)
1019 else:
1020 arr = self.Array('i', seq)
1021
1022 self.assertEqual(len(arr), len(seq))
1023 self.assertEqual(arr[3], seq[3])
1024 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1025
1026 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1027
1028 self.assertEqual(list(arr[:]), seq)
1029
1030 self.f(seq)
1031
1032 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001033 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001034 p.start()
1035 p.join()
1036
1037 self.assertEqual(list(arr[:]), seq)
1038
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001039 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001040 def test_array_from_size(self):
1041 size = 10
1042 # Test for zeroing (see issue #11675).
1043 # The repetition below strengthens the test by increasing the chances
1044 # of previously allocated non-zero memory being used for the new array
1045 # on the 2nd and 3rd loops.
1046 for _ in range(3):
1047 arr = self.Array('i', size)
1048 self.assertEqual(len(arr), size)
1049 self.assertEqual(list(arr), [0] * size)
1050 arr[:] = range(10)
1051 self.assertEqual(list(arr), list(range(10)))
1052 del arr
1053
1054 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001055 def test_rawarray(self):
1056 self.test_array(raw=True)
1057
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001058 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060 arr1 = self.Array('i', list(range(10)))
1061 lock1 = arr1.get_lock()
1062 obj1 = arr1.get_obj()
1063
1064 arr2 = self.Array('i', list(range(10)), lock=None)
1065 lock2 = arr2.get_lock()
1066 obj2 = arr2.get_obj()
1067
1068 lock = self.Lock()
1069 arr3 = self.Array('i', list(range(10)), lock=lock)
1070 lock3 = arr3.get_lock()
1071 obj3 = arr3.get_obj()
1072 self.assertEqual(lock, lock3)
1073
Jesse Nollerb0516a62009-01-18 03:11:38 +00001074 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001075 self.assertFalse(hasattr(arr4, 'get_lock'))
1076 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001077 self.assertRaises(AttributeError,
1078 self.Array, 'i', range(10), lock='notalock')
1079
1080 arr5 = self.RawArray('i', range(10))
1081 self.assertFalse(hasattr(arr5, 'get_lock'))
1082 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083
1084#
1085#
1086#
1087
1088class _TestContainers(BaseTestCase):
1089
1090 ALLOWED_TYPES = ('manager',)
1091
1092 def test_list(self):
1093 a = self.list(list(range(10)))
1094 self.assertEqual(a[:], list(range(10)))
1095
1096 b = self.list()
1097 self.assertEqual(b[:], [])
1098
1099 b.extend(list(range(5)))
1100 self.assertEqual(b[:], list(range(5)))
1101
1102 self.assertEqual(b[2], 2)
1103 self.assertEqual(b[2:10], [2,3,4])
1104
1105 b *= 2
1106 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1107
1108 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1109
1110 self.assertEqual(a[:], list(range(10)))
1111
1112 d = [a, b]
1113 e = self.list(d)
1114 self.assertEqual(
1115 e[:],
1116 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1117 )
1118
1119 f = self.list([a])
1120 a.append('hello')
1121 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1122
1123 def test_dict(self):
1124 d = self.dict()
1125 indices = list(range(65, 70))
1126 for i in indices:
1127 d[i] = chr(i)
1128 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1129 self.assertEqual(sorted(d.keys()), indices)
1130 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1131 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1132
1133 def test_namespace(self):
1134 n = self.Namespace()
1135 n.name = 'Bob'
1136 n.job = 'Builder'
1137 n._hidden = 'hidden'
1138 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1139 del n.job
1140 self.assertEqual(str(n), "Namespace(name='Bob')")
1141 self.assertTrue(hasattr(n, 'name'))
1142 self.assertTrue(not hasattr(n, 'job'))
1143
1144#
1145#
1146#
1147
1148def sqr(x, wait=0.0):
1149 time.sleep(wait)
1150 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001151
Antoine Pitroude911b22011-12-21 11:03:24 +01001152def mul(x, y):
1153 return x*y
1154
Benjamin Petersone711caf2008-06-11 16:44:04 +00001155class _TestPool(BaseTestCase):
1156
1157 def test_apply(self):
1158 papply = self.pool.apply
1159 self.assertEqual(papply(sqr, (5,)), sqr(5))
1160 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1161
1162 def test_map(self):
1163 pmap = self.pool.map
1164 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1165 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1166 list(map(sqr, list(range(100)))))
1167
Antoine Pitroude911b22011-12-21 11:03:24 +01001168 def test_starmap(self):
1169 psmap = self.pool.starmap
1170 tuples = list(zip(range(10), range(9,-1, -1)))
1171 self.assertEqual(psmap(mul, tuples),
1172 list(itertools.starmap(mul, tuples)))
1173 tuples = list(zip(range(100), range(99,-1, -1)))
1174 self.assertEqual(psmap(mul, tuples, chunksize=20),
1175 list(itertools.starmap(mul, tuples)))
1176
1177 def test_starmap_async(self):
1178 tuples = list(zip(range(100), range(99,-1, -1)))
1179 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1180 list(itertools.starmap(mul, tuples)))
1181
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001182 def test_map_chunksize(self):
1183 try:
1184 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1185 except multiprocessing.TimeoutError:
1186 self.fail("pool.map_async with chunksize stalled on null list")
1187
Benjamin Petersone711caf2008-06-11 16:44:04 +00001188 def test_async(self):
1189 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1190 get = TimingWrapper(res.get)
1191 self.assertEqual(get(), 49)
1192 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1193
1194 def test_async_timeout(self):
1195 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1196 get = TimingWrapper(res.get)
1197 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1198 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1199
1200 def test_imap(self):
1201 it = self.pool.imap(sqr, list(range(10)))
1202 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1203
1204 it = self.pool.imap(sqr, list(range(10)))
1205 for i in range(10):
1206 self.assertEqual(next(it), i*i)
1207 self.assertRaises(StopIteration, it.__next__)
1208
1209 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1210 for i in range(1000):
1211 self.assertEqual(next(it), i*i)
1212 self.assertRaises(StopIteration, it.__next__)
1213
1214 def test_imap_unordered(self):
1215 it = self.pool.imap_unordered(sqr, list(range(1000)))
1216 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1217
1218 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1219 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1220
1221 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001222 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1223 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1224
Benjamin Petersone711caf2008-06-11 16:44:04 +00001225 p = multiprocessing.Pool(3)
1226 self.assertEqual(3, len(p._pool))
1227 p.close()
1228 p.join()
1229
1230 def test_terminate(self):
1231 if self.TYPE == 'manager':
1232 # On Unix a forked process increfs each shared object to
1233 # which its parent process held a reference. If the
1234 # forked process gets terminated then there is likely to
1235 # be a reference leak. So to prevent
1236 # _TestZZZNumberOfObjects from failing we skip this test
1237 # when using a manager.
1238 return
1239
1240 result = self.pool.map_async(
1241 time.sleep, [0.1 for i in range(10000)], chunksize=1
1242 )
1243 self.pool.terminate()
1244 join = TimingWrapper(self.pool.join)
1245 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001246 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001247
Ask Solem2afcbf22010-11-09 20:55:52 +00001248def raising():
1249 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001250
Ask Solem2afcbf22010-11-09 20:55:52 +00001251def unpickleable_result():
1252 return lambda: 42
1253
1254class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001255 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001256
1257 def test_async_error_callback(self):
1258 p = multiprocessing.Pool(2)
1259
1260 scratchpad = [None]
1261 def errback(exc):
1262 scratchpad[0] = exc
1263
1264 res = p.apply_async(raising, error_callback=errback)
1265 self.assertRaises(KeyError, res.get)
1266 self.assertTrue(scratchpad[0])
1267 self.assertIsInstance(scratchpad[0], KeyError)
1268
1269 p.close()
1270 p.join()
1271
1272 def test_unpickleable_result(self):
1273 from multiprocessing.pool import MaybeEncodingError
1274 p = multiprocessing.Pool(2)
1275
1276 # Make sure we don't lose pool processes because of encoding errors.
1277 for iteration in range(20):
1278
1279 scratchpad = [None]
1280 def errback(exc):
1281 scratchpad[0] = exc
1282
1283 res = p.apply_async(unpickleable_result, error_callback=errback)
1284 self.assertRaises(MaybeEncodingError, res.get)
1285 wrapped = scratchpad[0]
1286 self.assertTrue(wrapped)
1287 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1288 self.assertIsNotNone(wrapped.exc)
1289 self.assertIsNotNone(wrapped.value)
1290
1291 p.close()
1292 p.join()
1293
1294class _TestPoolWorkerLifetime(BaseTestCase):
1295 ALLOWED_TYPES = ('processes', )
1296
Jesse Noller1f0b6582010-01-27 03:36:01 +00001297 def test_pool_worker_lifetime(self):
1298 p = multiprocessing.Pool(3, maxtasksperchild=10)
1299 self.assertEqual(3, len(p._pool))
1300 origworkerpids = [w.pid for w in p._pool]
1301 # Run many tasks so each worker gets replaced (hopefully)
1302 results = []
1303 for i in range(100):
1304 results.append(p.apply_async(sqr, (i, )))
1305 # Fetch the results and verify we got the right answers,
1306 # also ensuring all the tasks have completed.
1307 for (j, res) in enumerate(results):
1308 self.assertEqual(res.get(), sqr(j))
1309 # Refill the pool
1310 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001311 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001312 # (countdown * DELTA = 5 seconds max startup process time)
1313 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001314 while countdown and not all(w.is_alive() for w in p._pool):
1315 countdown -= 1
1316 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001317 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001318 # All pids should be assigned. See issue #7805.
1319 self.assertNotIn(None, origworkerpids)
1320 self.assertNotIn(None, finalworkerpids)
1321 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001322 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1323 p.close()
1324 p.join()
1325
Charles-François Natalif8859e12011-10-24 18:45:29 +02001326 def test_pool_worker_lifetime_early_close(self):
1327 # Issue #10332: closing a pool whose workers have limited lifetimes
1328 # before all the tasks completed would make join() hang.
1329 p = multiprocessing.Pool(3, maxtasksperchild=1)
1330 results = []
1331 for i in range(6):
1332 results.append(p.apply_async(sqr, (i, 0.3)))
1333 p.close()
1334 p.join()
1335 # check the results
1336 for (j, res) in enumerate(results):
1337 self.assertEqual(res.get(), sqr(j))
1338
1339
Benjamin Petersone711caf2008-06-11 16:44:04 +00001340#
1341# Test that manager has expected number of shared objects left
1342#
1343
1344class _TestZZZNumberOfObjects(BaseTestCase):
1345 # Because test cases are sorted alphabetically, this one will get
1346 # run after all the other tests for the manager. It tests that
1347 # there have been no "reference leaks" for the manager's shared
1348 # objects. Note the comment in _TestPool.test_terminate().
1349 ALLOWED_TYPES = ('manager',)
1350
1351 def test_number_of_objects(self):
1352 EXPECTED_NUMBER = 1 # the pool object is still alive
1353 multiprocessing.active_children() # discard dead process objs
1354 gc.collect() # do garbage collection
1355 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001356 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001357 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001358 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001359 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001360
1361 self.assertEqual(refs, EXPECTED_NUMBER)
1362
1363#
1364# Test of creating a customized manager class
1365#
1366
1367from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1368
1369class FooBar(object):
1370 def f(self):
1371 return 'f()'
1372 def g(self):
1373 raise ValueError
1374 def _h(self):
1375 return '_h()'
1376
1377def baz():
1378 for i in range(10):
1379 yield i*i
1380
1381class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001382 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001383 def __iter__(self):
1384 return self
1385 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001386 return self._callmethod('__next__')
1387
1388class MyManager(BaseManager):
1389 pass
1390
1391MyManager.register('Foo', callable=FooBar)
1392MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1393MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1394
1395
1396class _TestMyManager(BaseTestCase):
1397
1398 ALLOWED_TYPES = ('manager',)
1399
1400 def test_mymanager(self):
1401 manager = MyManager()
1402 manager.start()
1403
1404 foo = manager.Foo()
1405 bar = manager.Bar()
1406 baz = manager.baz()
1407
1408 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1409 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1410
1411 self.assertEqual(foo_methods, ['f', 'g'])
1412 self.assertEqual(bar_methods, ['f', '_h'])
1413
1414 self.assertEqual(foo.f(), 'f()')
1415 self.assertRaises(ValueError, foo.g)
1416 self.assertEqual(foo._callmethod('f'), 'f()')
1417 self.assertRaises(RemoteError, foo._callmethod, '_h')
1418
1419 self.assertEqual(bar.f(), 'f()')
1420 self.assertEqual(bar._h(), '_h()')
1421 self.assertEqual(bar._callmethod('f'), 'f()')
1422 self.assertEqual(bar._callmethod('_h'), '_h()')
1423
1424 self.assertEqual(list(baz), [i*i for i in range(10)])
1425
1426 manager.shutdown()
1427
1428#
1429# Test of connecting to a remote server and using xmlrpclib for serialization
1430#
1431
1432_queue = pyqueue.Queue()
1433def get_queue():
1434 return _queue
1435
1436class QueueManager(BaseManager):
1437 '''manager class used by server process'''
1438QueueManager.register('get_queue', callable=get_queue)
1439
1440class QueueManager2(BaseManager):
1441 '''manager class which specifies the same interface as QueueManager'''
1442QueueManager2.register('get_queue')
1443
1444
1445SERIALIZER = 'xmlrpclib'
1446
1447class _TestRemoteManager(BaseTestCase):
1448
1449 ALLOWED_TYPES = ('manager',)
1450
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001451 @classmethod
1452 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 manager = QueueManager2(
1454 address=address, authkey=authkey, serializer=SERIALIZER
1455 )
1456 manager.connect()
1457 queue = manager.get_queue()
1458 queue.put(('hello world', None, True, 2.25))
1459
1460 def test_remote(self):
1461 authkey = os.urandom(32)
1462
1463 manager = QueueManager(
1464 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1465 )
1466 manager.start()
1467
1468 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001469 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001470 p.start()
1471
1472 manager2 = QueueManager2(
1473 address=manager.address, authkey=authkey, serializer=SERIALIZER
1474 )
1475 manager2.connect()
1476 queue = manager2.get_queue()
1477
1478 # Note that xmlrpclib will deserialize object as a list not a tuple
1479 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1480
1481 # Because we are using xmlrpclib for serialization instead of
1482 # pickle this will cause a serialization error.
1483 self.assertRaises(Exception, queue.put, time.sleep)
1484
1485 # Make queue finalizer run before the server is stopped
1486 del queue
1487 manager.shutdown()
1488
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001489class _TestManagerRestart(BaseTestCase):
1490
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001491 @classmethod
1492 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001493 manager = QueueManager(
1494 address=address, authkey=authkey, serializer=SERIALIZER)
1495 manager.connect()
1496 queue = manager.get_queue()
1497 queue.put('hello world')
1498
1499 def test_rapid_restart(self):
1500 authkey = os.urandom(32)
1501 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001502 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001503 srvr = manager.get_server()
1504 addr = srvr.address
1505 # Close the connection.Listener socket which gets opened as a part
1506 # of manager.get_server(). It's not needed for the test.
1507 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001508 manager.start()
1509
1510 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001511 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001512 p.start()
1513 queue = manager.get_queue()
1514 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001515 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001516 manager.shutdown()
1517 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001518 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001519 try:
1520 manager.start()
1521 except IOError as e:
1522 if e.errno != errno.EADDRINUSE:
1523 raise
1524 # Retry after some time, in case the old socket was lingering
1525 # (sporadic failure on buildbots)
1526 time.sleep(1.0)
1527 manager = QueueManager(
1528 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001529 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001530
Benjamin Petersone711caf2008-06-11 16:44:04 +00001531#
1532#
1533#
1534
1535SENTINEL = latin('')
1536
1537class _TestConnection(BaseTestCase):
1538
1539 ALLOWED_TYPES = ('processes', 'threads')
1540
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001541 @classmethod
1542 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001543 for msg in iter(conn.recv_bytes, SENTINEL):
1544 conn.send_bytes(msg)
1545 conn.close()
1546
1547 def test_connection(self):
1548 conn, child_conn = self.Pipe()
1549
1550 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001551 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001552 p.start()
1553
1554 seq = [1, 2.25, None]
1555 msg = latin('hello world')
1556 longmsg = msg * 10
1557 arr = array.array('i', list(range(4)))
1558
1559 if self.TYPE == 'processes':
1560 self.assertEqual(type(conn.fileno()), int)
1561
1562 self.assertEqual(conn.send(seq), None)
1563 self.assertEqual(conn.recv(), seq)
1564
1565 self.assertEqual(conn.send_bytes(msg), None)
1566 self.assertEqual(conn.recv_bytes(), msg)
1567
1568 if self.TYPE == 'processes':
1569 buffer = array.array('i', [0]*10)
1570 expected = list(arr) + [0] * (10 - len(arr))
1571 self.assertEqual(conn.send_bytes(arr), None)
1572 self.assertEqual(conn.recv_bytes_into(buffer),
1573 len(arr) * buffer.itemsize)
1574 self.assertEqual(list(buffer), expected)
1575
1576 buffer = array.array('i', [0]*10)
1577 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1578 self.assertEqual(conn.send_bytes(arr), None)
1579 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1580 len(arr) * buffer.itemsize)
1581 self.assertEqual(list(buffer), expected)
1582
1583 buffer = bytearray(latin(' ' * 40))
1584 self.assertEqual(conn.send_bytes(longmsg), None)
1585 try:
1586 res = conn.recv_bytes_into(buffer)
1587 except multiprocessing.BufferTooShort as e:
1588 self.assertEqual(e.args, (longmsg,))
1589 else:
1590 self.fail('expected BufferTooShort, got %s' % res)
1591
1592 poll = TimingWrapper(conn.poll)
1593
1594 self.assertEqual(poll(), False)
1595 self.assertTimingAlmostEqual(poll.elapsed, 0)
1596
1597 self.assertEqual(poll(TIMEOUT1), False)
1598 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1599
1600 conn.send(None)
1601
1602 self.assertEqual(poll(TIMEOUT1), True)
1603 self.assertTimingAlmostEqual(poll.elapsed, 0)
1604
1605 self.assertEqual(conn.recv(), None)
1606
1607 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1608 conn.send_bytes(really_big_msg)
1609 self.assertEqual(conn.recv_bytes(), really_big_msg)
1610
1611 conn.send_bytes(SENTINEL) # tell child to quit
1612 child_conn.close()
1613
1614 if self.TYPE == 'processes':
1615 self.assertEqual(conn.readable, True)
1616 self.assertEqual(conn.writable, True)
1617 self.assertRaises(EOFError, conn.recv)
1618 self.assertRaises(EOFError, conn.recv_bytes)
1619
1620 p.join()
1621
1622 def test_duplex_false(self):
1623 reader, writer = self.Pipe(duplex=False)
1624 self.assertEqual(writer.send(1), None)
1625 self.assertEqual(reader.recv(), 1)
1626 if self.TYPE == 'processes':
1627 self.assertEqual(reader.readable, True)
1628 self.assertEqual(reader.writable, False)
1629 self.assertEqual(writer.readable, False)
1630 self.assertEqual(writer.writable, True)
1631 self.assertRaises(IOError, reader.send, 2)
1632 self.assertRaises(IOError, writer.recv)
1633 self.assertRaises(IOError, writer.poll)
1634
1635 def test_spawn_close(self):
1636 # We test that a pipe connection can be closed by parent
1637 # process immediately after child is spawned. On Windows this
1638 # would have sometimes failed on old versions because
1639 # child_conn would be closed before the child got a chance to
1640 # duplicate it.
1641 conn, child_conn = self.Pipe()
1642
1643 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001644 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001645 p.start()
1646 child_conn.close() # this might complete before child initializes
1647
1648 msg = latin('hello')
1649 conn.send_bytes(msg)
1650 self.assertEqual(conn.recv_bytes(), msg)
1651
1652 conn.send_bytes(SENTINEL)
1653 conn.close()
1654 p.join()
1655
1656 def test_sendbytes(self):
1657 if self.TYPE != 'processes':
1658 return
1659
1660 msg = latin('abcdefghijklmnopqrstuvwxyz')
1661 a, b = self.Pipe()
1662
1663 a.send_bytes(msg)
1664 self.assertEqual(b.recv_bytes(), msg)
1665
1666 a.send_bytes(msg, 5)
1667 self.assertEqual(b.recv_bytes(), msg[5:])
1668
1669 a.send_bytes(msg, 7, 8)
1670 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1671
1672 a.send_bytes(msg, 26)
1673 self.assertEqual(b.recv_bytes(), latin(''))
1674
1675 a.send_bytes(msg, 26, 0)
1676 self.assertEqual(b.recv_bytes(), latin(''))
1677
1678 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1679
1680 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1681
1682 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1683
1684 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1685
1686 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1687
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001688 @classmethod
1689 def _is_fd_assigned(cls, fd):
1690 try:
1691 os.fstat(fd)
1692 except OSError as e:
1693 if e.errno == errno.EBADF:
1694 return False
1695 raise
1696 else:
1697 return True
1698
1699 @classmethod
1700 def _writefd(cls, conn, data, create_dummy_fds=False):
1701 if create_dummy_fds:
1702 for i in range(0, 256):
1703 if not cls._is_fd_assigned(i):
1704 os.dup2(conn.fileno(), i)
1705 fd = reduction.recv_handle(conn)
1706 if msvcrt:
1707 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1708 os.write(fd, data)
1709 os.close(fd)
1710
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001711 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001712 def test_fd_transfer(self):
1713 if self.TYPE != 'processes':
1714 self.skipTest("only makes sense with processes")
1715 conn, child_conn = self.Pipe(duplex=True)
1716
1717 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001718 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001719 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001720 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001721 with open(test.support.TESTFN, "wb") as f:
1722 fd = f.fileno()
1723 if msvcrt:
1724 fd = msvcrt.get_osfhandle(fd)
1725 reduction.send_handle(conn, fd, p.pid)
1726 p.join()
1727 with open(test.support.TESTFN, "rb") as f:
1728 self.assertEqual(f.read(), b"foo")
1729
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001730 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001731 @unittest.skipIf(sys.platform == "win32",
1732 "test semantics don't make sense on Windows")
1733 @unittest.skipIf(MAXFD <= 256,
1734 "largest assignable fd number is too small")
1735 @unittest.skipUnless(hasattr(os, "dup2"),
1736 "test needs os.dup2()")
1737 def test_large_fd_transfer(self):
1738 # With fd > 256 (issue #11657)
1739 if self.TYPE != 'processes':
1740 self.skipTest("only makes sense with processes")
1741 conn, child_conn = self.Pipe(duplex=True)
1742
1743 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001744 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001745 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001746 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001747 with open(test.support.TESTFN, "wb") as f:
1748 fd = f.fileno()
1749 for newfd in range(256, MAXFD):
1750 if not self._is_fd_assigned(newfd):
1751 break
1752 else:
1753 self.fail("could not find an unassigned large file descriptor")
1754 os.dup2(fd, newfd)
1755 try:
1756 reduction.send_handle(conn, newfd, p.pid)
1757 finally:
1758 os.close(newfd)
1759 p.join()
1760 with open(test.support.TESTFN, "rb") as f:
1761 self.assertEqual(f.read(), b"bar")
1762
Jesus Cea4507e642011-09-21 03:53:25 +02001763 @classmethod
1764 def _send_data_without_fd(self, conn):
1765 os.write(conn.fileno(), b"\0")
1766
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001767 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001768 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1769 def test_missing_fd_transfer(self):
1770 # Check that exception is raised when received data is not
1771 # accompanied by a file descriptor in ancillary data.
1772 if self.TYPE != 'processes':
1773 self.skipTest("only makes sense with processes")
1774 conn, child_conn = self.Pipe(duplex=True)
1775
1776 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1777 p.daemon = True
1778 p.start()
1779 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1780 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001781
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001782class _TestListener(BaseTestCase):
1783
1784 ALLOWED_TYPES = ('processes')
1785
1786 def test_multiple_bind(self):
1787 for family in self.connection.families:
1788 l = self.connection.Listener(family=family)
1789 self.addCleanup(l.close)
1790 self.assertRaises(OSError, self.connection.Listener,
1791 l.address, family)
1792
Benjamin Petersone711caf2008-06-11 16:44:04 +00001793class _TestListenerClient(BaseTestCase):
1794
1795 ALLOWED_TYPES = ('processes', 'threads')
1796
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001797 @classmethod
1798 def _test(cls, address):
1799 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001800 conn.send('hello')
1801 conn.close()
1802
1803 def test_listener_client(self):
1804 for family in self.connection.families:
1805 l = self.connection.Listener(family=family)
1806 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001807 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001808 p.start()
1809 conn = l.accept()
1810 self.assertEqual(conn.recv(), 'hello')
1811 p.join()
1812 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001813
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01001814class _TestPoll(unittest.TestCase):
1815
1816 ALLOWED_TYPES = ('processes', 'threads')
1817
1818 def test_empty_string(self):
1819 a, b = self.Pipe()
1820 self.assertEqual(a.poll(), False)
1821 b.send_bytes(b'')
1822 self.assertEqual(a.poll(), True)
1823 self.assertEqual(a.poll(), True)
1824
1825 @classmethod
1826 def _child_strings(cls, conn, strings):
1827 for s in strings:
1828 time.sleep(0.1)
1829 conn.send_bytes(s)
1830 conn.close()
1831
1832 def test_strings(self):
1833 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
1834 a, b = self.Pipe()
1835 p = self.Process(target=self._child_strings, args=(b, strings))
1836 p.start()
1837
1838 for s in strings:
1839 for i in range(200):
1840 if a.poll(0.01):
1841 break
1842 x = a.recv_bytes()
1843 self.assertEqual(s, x)
1844
1845 p.join()
1846
1847 @classmethod
1848 def _child_boundaries(cls, r):
1849 # Polling may "pull" a message in to the child process, but we
1850 # don't want it to pull only part of a message, as that would
1851 # corrupt the pipe for any other processes which might later
1852 # read from it.
1853 r.poll(5)
1854
1855 def test_boundaries(self):
1856 r, w = self.Pipe(False)
1857 p = self.Process(target=self._child_boundaries, args=(r,))
1858 p.start()
1859 time.sleep(2)
1860 L = [b"first", b"second"]
1861 for obj in L:
1862 w.send_bytes(obj)
1863 w.close()
1864 p.join()
1865 self.assertIn(r.recv_bytes(), L)
1866
1867 @classmethod
1868 def _child_dont_merge(cls, b):
1869 b.send_bytes(b'a')
1870 b.send_bytes(b'b')
1871 b.send_bytes(b'cd')
1872
1873 def test_dont_merge(self):
1874 a, b = self.Pipe()
1875 self.assertEqual(a.poll(0.0), False)
1876 self.assertEqual(a.poll(0.1), False)
1877
1878 p = self.Process(target=self._child_dont_merge, args=(b,))
1879 p.start()
1880
1881 self.assertEqual(a.recv_bytes(), b'a')
1882 self.assertEqual(a.poll(1.0), True)
1883 self.assertEqual(a.poll(1.0), True)
1884 self.assertEqual(a.recv_bytes(), b'b')
1885 self.assertEqual(a.poll(1.0), True)
1886 self.assertEqual(a.poll(1.0), True)
1887 self.assertEqual(a.poll(0.0), True)
1888 self.assertEqual(a.recv_bytes(), b'cd')
1889
1890 p.join()
1891
Benjamin Petersone711caf2008-06-11 16:44:04 +00001892#
1893# Test of sending connection and socket objects between processes
1894#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001895"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001896class _TestPicklingConnections(BaseTestCase):
1897
1898 ALLOWED_TYPES = ('processes',)
1899
1900 def _listener(self, conn, families):
1901 for fam in families:
1902 l = self.connection.Listener(family=fam)
1903 conn.send(l.address)
1904 new_conn = l.accept()
1905 conn.send(new_conn)
1906
1907 if self.TYPE == 'processes':
1908 l = socket.socket()
1909 l.bind(('localhost', 0))
1910 conn.send(l.getsockname())
1911 l.listen(1)
1912 new_conn, addr = l.accept()
1913 conn.send(new_conn)
1914
1915 conn.recv()
1916
1917 def _remote(self, conn):
1918 for (address, msg) in iter(conn.recv, None):
1919 client = self.connection.Client(address)
1920 client.send(msg.upper())
1921 client.close()
1922
1923 if self.TYPE == 'processes':
1924 address, msg = conn.recv()
1925 client = socket.socket()
1926 client.connect(address)
1927 client.sendall(msg.upper())
1928 client.close()
1929
1930 conn.close()
1931
1932 def test_pickling(self):
1933 try:
1934 multiprocessing.allow_connection_pickling()
1935 except ImportError:
1936 return
1937
1938 families = self.connection.families
1939
1940 lconn, lconn0 = self.Pipe()
1941 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001942 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001943 lp.start()
1944 lconn0.close()
1945
1946 rconn, rconn0 = self.Pipe()
1947 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001948 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001949 rp.start()
1950 rconn0.close()
1951
1952 for fam in families:
1953 msg = ('This connection uses family %s' % fam).encode('ascii')
1954 address = lconn.recv()
1955 rconn.send((address, msg))
1956 new_conn = lconn.recv()
1957 self.assertEqual(new_conn.recv(), msg.upper())
1958
1959 rconn.send(None)
1960
1961 if self.TYPE == 'processes':
1962 msg = latin('This connection uses a normal socket')
1963 address = lconn.recv()
1964 rconn.send((address, msg))
1965 if hasattr(socket, 'fromfd'):
1966 new_conn = lconn.recv()
1967 self.assertEqual(new_conn.recv(100), msg.upper())
1968 else:
1969 # XXX On Windows with Py2.6 need to backport fromfd()
1970 discard = lconn.recv_bytes()
1971
1972 lconn.send(None)
1973
1974 rconn.close()
1975 lconn.close()
1976
1977 lp.join()
1978 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001979"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001980#
1981#
1982#
1983
1984class _TestHeap(BaseTestCase):
1985
1986 ALLOWED_TYPES = ('processes',)
1987
1988 def test_heap(self):
1989 iterations = 5000
1990 maxblocks = 50
1991 blocks = []
1992
1993 # create and destroy lots of blocks of different sizes
1994 for i in range(iterations):
1995 size = int(random.lognormvariate(0, 1) * 1000)
1996 b = multiprocessing.heap.BufferWrapper(size)
1997 blocks.append(b)
1998 if len(blocks) > maxblocks:
1999 i = random.randrange(maxblocks)
2000 del blocks[i]
2001
2002 # get the heap object
2003 heap = multiprocessing.heap.BufferWrapper._heap
2004
2005 # verify the state of the heap
2006 all = []
2007 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002008 heap._lock.acquire()
2009 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002010 for L in list(heap._len_to_seq.values()):
2011 for arena, start, stop in L:
2012 all.append((heap._arenas.index(arena), start, stop,
2013 stop-start, 'free'))
2014 for arena, start, stop in heap._allocated_blocks:
2015 all.append((heap._arenas.index(arena), start, stop,
2016 stop-start, 'occupied'))
2017 occupied += (stop-start)
2018
2019 all.sort()
2020
2021 for i in range(len(all)-1):
2022 (arena, start, stop) = all[i][:3]
2023 (narena, nstart, nstop) = all[i+1][:3]
2024 self.assertTrue((arena != narena and nstart == 0) or
2025 (stop == nstart))
2026
Charles-François Natali778db492011-07-02 14:35:49 +02002027 def test_free_from_gc(self):
2028 # Check that freeing of blocks by the garbage collector doesn't deadlock
2029 # (issue #12352).
2030 # Make sure the GC is enabled, and set lower collection thresholds to
2031 # make collections more frequent (and increase the probability of
2032 # deadlock).
2033 if not gc.isenabled():
2034 gc.enable()
2035 self.addCleanup(gc.disable)
2036 thresholds = gc.get_threshold()
2037 self.addCleanup(gc.set_threshold, *thresholds)
2038 gc.set_threshold(10)
2039
2040 # perform numerous block allocations, with cyclic references to make
2041 # sure objects are collected asynchronously by the gc
2042 for i in range(5000):
2043 a = multiprocessing.heap.BufferWrapper(1)
2044 b = multiprocessing.heap.BufferWrapper(1)
2045 # circular references
2046 a.buddy = b
2047 b.buddy = a
2048
Benjamin Petersone711caf2008-06-11 16:44:04 +00002049#
2050#
2051#
2052
Benjamin Petersone711caf2008-06-11 16:44:04 +00002053class _Foo(Structure):
2054 _fields_ = [
2055 ('x', c_int),
2056 ('y', c_double)
2057 ]
2058
2059class _TestSharedCTypes(BaseTestCase):
2060
2061 ALLOWED_TYPES = ('processes',)
2062
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002063 def setUp(self):
2064 if not HAS_SHAREDCTYPES:
2065 self.skipTest("requires multiprocessing.sharedctypes")
2066
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002067 @classmethod
2068 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002069 x.value *= 2
2070 y.value *= 2
2071 foo.x *= 2
2072 foo.y *= 2
2073 string.value *= 2
2074 for i in range(len(arr)):
2075 arr[i] *= 2
2076
2077 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002078 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002079 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002080 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002081 arr = self.Array('d', list(range(10)), lock=lock)
2082 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002083 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002084
2085 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002086 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002087 p.start()
2088 p.join()
2089
2090 self.assertEqual(x.value, 14)
2091 self.assertAlmostEqual(y.value, 2.0/3.0)
2092 self.assertEqual(foo.x, 6)
2093 self.assertAlmostEqual(foo.y, 4.0)
2094 for i in range(10):
2095 self.assertAlmostEqual(arr[i], i*2)
2096 self.assertEqual(string.value, latin('hellohello'))
2097
2098 def test_synchronize(self):
2099 self.test_sharedctypes(lock=True)
2100
2101 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002102 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002103 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002104 foo.x = 0
2105 foo.y = 0
2106 self.assertEqual(bar.x, 2)
2107 self.assertAlmostEqual(bar.y, 5.0)
2108
2109#
2110#
2111#
2112
2113class _TestFinalize(BaseTestCase):
2114
2115 ALLOWED_TYPES = ('processes',)
2116
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002117 @classmethod
2118 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002119 class Foo(object):
2120 pass
2121
2122 a = Foo()
2123 util.Finalize(a, conn.send, args=('a',))
2124 del a # triggers callback for a
2125
2126 b = Foo()
2127 close_b = util.Finalize(b, conn.send, args=('b',))
2128 close_b() # triggers callback for b
2129 close_b() # does nothing because callback has already been called
2130 del b # does nothing because callback has already been called
2131
2132 c = Foo()
2133 util.Finalize(c, conn.send, args=('c',))
2134
2135 d10 = Foo()
2136 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2137
2138 d01 = Foo()
2139 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2140 d02 = Foo()
2141 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2142 d03 = Foo()
2143 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2144
2145 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2146
2147 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2148
Ezio Melotti13925002011-03-16 11:05:33 +02002149 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002150 # garbage collecting locals
2151 util._exit_function()
2152 conn.close()
2153 os._exit(0)
2154
2155 def test_finalize(self):
2156 conn, child_conn = self.Pipe()
2157
2158 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002159 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002160 p.start()
2161 p.join()
2162
2163 result = [obj for obj in iter(conn.recv, 'STOP')]
2164 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2165
2166#
2167# Test that from ... import * works for each module
2168#
2169
2170class _TestImportStar(BaseTestCase):
2171
2172 ALLOWED_TYPES = ('processes',)
2173
2174 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002175 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002176 'multiprocessing', 'multiprocessing.connection',
2177 'multiprocessing.heap', 'multiprocessing.managers',
2178 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002179 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002180 ]
2181
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002182 if HAS_REDUCTION:
2183 modules.append('multiprocessing.reduction')
2184
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002185 if c_int is not None:
2186 # This module requires _ctypes
2187 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002188
2189 for name in modules:
2190 __import__(name)
2191 mod = sys.modules[name]
2192
2193 for attr in getattr(mod, '__all__', ()):
2194 self.assertTrue(
2195 hasattr(mod, attr),
2196 '%r does not have attribute %r' % (mod, attr)
2197 )
2198
2199#
2200# Quick test that logging works -- does not test logging output
2201#
2202
2203class _TestLogging(BaseTestCase):
2204
2205 ALLOWED_TYPES = ('processes',)
2206
2207 def test_enable_logging(self):
2208 logger = multiprocessing.get_logger()
2209 logger.setLevel(util.SUBWARNING)
2210 self.assertTrue(logger is not None)
2211 logger.debug('this will not be printed')
2212 logger.info('nor will this')
2213 logger.setLevel(LOG_LEVEL)
2214
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002215 @classmethod
2216 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002217 logger = multiprocessing.get_logger()
2218 conn.send(logger.getEffectiveLevel())
2219
2220 def test_level(self):
2221 LEVEL1 = 32
2222 LEVEL2 = 37
2223
2224 logger = multiprocessing.get_logger()
2225 root_logger = logging.getLogger()
2226 root_level = root_logger.level
2227
2228 reader, writer = multiprocessing.Pipe(duplex=False)
2229
2230 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002231 p = self.Process(target=self._test_level, args=(writer,))
2232 p.daemon = True
2233 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002234 self.assertEqual(LEVEL1, reader.recv())
2235
2236 logger.setLevel(logging.NOTSET)
2237 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002238 p = self.Process(target=self._test_level, args=(writer,))
2239 p.daemon = True
2240 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002241 self.assertEqual(LEVEL2, reader.recv())
2242
2243 root_logger.setLevel(root_level)
2244 logger.setLevel(level=LOG_LEVEL)
2245
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002246
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002247# class _TestLoggingProcessName(BaseTestCase):
2248#
2249# def handle(self, record):
2250# assert record.processName == multiprocessing.current_process().name
2251# self.__handled = True
2252#
2253# def test_logging(self):
2254# handler = logging.Handler()
2255# handler.handle = self.handle
2256# self.__handled = False
2257# # Bypass getLogger() and side-effects
2258# logger = logging.getLoggerClass()(
2259# 'multiprocessing.test.TestLoggingProcessName')
2260# logger.addHandler(handler)
2261# logger.propagate = False
2262#
2263# logger.warn('foo')
2264# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002265
Benjamin Petersone711caf2008-06-11 16:44:04 +00002266#
Jesse Noller6214edd2009-01-19 16:23:53 +00002267# Test to verify handle verification, see issue 3321
2268#
2269
2270class TestInvalidHandle(unittest.TestCase):
2271
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002272 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002273 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002274 conn = multiprocessing.connection.Connection(44977608)
2275 try:
2276 self.assertRaises((ValueError, IOError), conn.poll)
2277 finally:
2278 # Hack private attribute _handle to avoid printing an error
2279 # in conn.__del__
2280 conn._handle = None
2281 self.assertRaises((ValueError, IOError),
2282 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002283
Jesse Noller6214edd2009-01-19 16:23:53 +00002284#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002285# Functions used to create test cases from the base ones in this module
2286#
2287
2288def get_attributes(Source, names):
2289 d = {}
2290 for name in names:
2291 obj = getattr(Source, name)
2292 if type(obj) == type(get_attributes):
2293 obj = staticmethod(obj)
2294 d[name] = obj
2295 return d
2296
2297def create_test_cases(Mixin, type):
2298 result = {}
2299 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002300 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002301
2302 for name in list(glob.keys()):
2303 if name.startswith('_Test'):
2304 base = glob[name]
2305 if type in base.ALLOWED_TYPES:
2306 newname = 'With' + Type + name[1:]
2307 class Temp(base, unittest.TestCase, Mixin):
2308 pass
2309 result[newname] = Temp
2310 Temp.__name__ = newname
2311 Temp.__module__ = Mixin.__module__
2312 return result
2313
2314#
2315# Create test cases
2316#
2317
2318class ProcessesMixin(object):
2319 TYPE = 'processes'
2320 Process = multiprocessing.Process
2321 locals().update(get_attributes(multiprocessing, (
2322 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2323 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2324 'RawArray', 'current_process', 'active_children', 'Pipe',
2325 'connection', 'JoinableQueue'
2326 )))
2327
2328testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2329globals().update(testcases_processes)
2330
2331
2332class ManagerMixin(object):
2333 TYPE = 'manager'
2334 Process = multiprocessing.Process
2335 manager = object.__new__(multiprocessing.managers.SyncManager)
2336 locals().update(get_attributes(manager, (
2337 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2338 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2339 'Namespace', 'JoinableQueue'
2340 )))
2341
2342testcases_manager = create_test_cases(ManagerMixin, type='manager')
2343globals().update(testcases_manager)
2344
2345
2346class ThreadsMixin(object):
2347 TYPE = 'threads'
2348 Process = multiprocessing.dummy.Process
2349 locals().update(get_attributes(multiprocessing.dummy, (
2350 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2351 'Condition', 'Event', 'Value', 'Array', 'current_process',
2352 'active_children', 'Pipe', 'connection', 'dict', 'list',
2353 'Namespace', 'JoinableQueue'
2354 )))
2355
2356testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2357globals().update(testcases_threads)
2358
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002359class OtherTest(unittest.TestCase):
2360 # TODO: add more tests for deliver/answer challenge.
2361 def test_deliver_challenge_auth_failure(self):
2362 class _FakeConnection(object):
2363 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002364 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002365 def send_bytes(self, data):
2366 pass
2367 self.assertRaises(multiprocessing.AuthenticationError,
2368 multiprocessing.connection.deliver_challenge,
2369 _FakeConnection(), b'abc')
2370
2371 def test_answer_challenge_auth_failure(self):
2372 class _FakeConnection(object):
2373 def __init__(self):
2374 self.count = 0
2375 def recv_bytes(self, size):
2376 self.count += 1
2377 if self.count == 1:
2378 return multiprocessing.connection.CHALLENGE
2379 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002380 return b'something bogus'
2381 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002382 def send_bytes(self, data):
2383 pass
2384 self.assertRaises(multiprocessing.AuthenticationError,
2385 multiprocessing.connection.answer_challenge,
2386 _FakeConnection(), b'abc')
2387
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002388#
2389# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2390#
2391
2392def initializer(ns):
2393 ns.test += 1
2394
2395class TestInitializers(unittest.TestCase):
2396 def setUp(self):
2397 self.mgr = multiprocessing.Manager()
2398 self.ns = self.mgr.Namespace()
2399 self.ns.test = 0
2400
2401 def tearDown(self):
2402 self.mgr.shutdown()
2403
2404 def test_manager_initializer(self):
2405 m = multiprocessing.managers.SyncManager()
2406 self.assertRaises(TypeError, m.start, 1)
2407 m.start(initializer, (self.ns,))
2408 self.assertEqual(self.ns.test, 1)
2409 m.shutdown()
2410
2411 def test_pool_initializer(self):
2412 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2413 p = multiprocessing.Pool(1, initializer, (self.ns,))
2414 p.close()
2415 p.join()
2416 self.assertEqual(self.ns.test, 1)
2417
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002418#
2419# Issue 5155, 5313, 5331: Test process in processes
2420# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2421#
2422
2423def _ThisSubProcess(q):
2424 try:
2425 item = q.get(block=False)
2426 except pyqueue.Empty:
2427 pass
2428
2429def _TestProcess(q):
2430 queue = multiprocessing.Queue()
2431 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002432 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002433 subProc.start()
2434 subProc.join()
2435
2436def _afunc(x):
2437 return x*x
2438
2439def pool_in_process():
2440 pool = multiprocessing.Pool(processes=4)
2441 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2442
2443class _file_like(object):
2444 def __init__(self, delegate):
2445 self._delegate = delegate
2446 self._pid = None
2447
2448 @property
2449 def cache(self):
2450 pid = os.getpid()
2451 # There are no race conditions since fork keeps only the running thread
2452 if pid != self._pid:
2453 self._pid = pid
2454 self._cache = []
2455 return self._cache
2456
2457 def write(self, data):
2458 self.cache.append(data)
2459
2460 def flush(self):
2461 self._delegate.write(''.join(self.cache))
2462 self._cache = []
2463
2464class TestStdinBadfiledescriptor(unittest.TestCase):
2465
2466 def test_queue_in_process(self):
2467 queue = multiprocessing.Queue()
2468 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2469 proc.start()
2470 proc.join()
2471
2472 def test_pool_in_process(self):
2473 p = multiprocessing.Process(target=pool_in_process)
2474 p.start()
2475 p.join()
2476
2477 def test_flushing(self):
2478 sio = io.StringIO()
2479 flike = _file_like(sio)
2480 flike.write('foo')
2481 proc = multiprocessing.Process(target=lambda: flike.flush())
2482 flike.flush()
2483 assert sio.getvalue() == 'foo'
2484
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002485
2486class TestWait(unittest.TestCase):
2487
2488 @classmethod
2489 def _child_test_wait(cls, w, slow):
2490 for i in range(10):
2491 if slow:
2492 time.sleep(random.random()*0.1)
2493 w.send((i, os.getpid()))
2494 w.close()
2495
2496 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002497 from multiprocessing.connection import wait
2498 readers = []
2499 procs = []
2500 messages = []
2501
2502 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002503 r, w = multiprocessing.Pipe(duplex=False)
2504 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002505 p.daemon = True
2506 p.start()
2507 w.close()
2508 readers.append(r)
2509 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002510 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002511
2512 while readers:
2513 for r in wait(readers):
2514 try:
2515 msg = r.recv()
2516 except EOFError:
2517 readers.remove(r)
2518 r.close()
2519 else:
2520 messages.append(msg)
2521
2522 messages.sort()
2523 expected = sorted((i, p.pid) for i in range(10) for p in procs)
2524 self.assertEqual(messages, expected)
2525
2526 @classmethod
2527 def _child_test_wait_socket(cls, address, slow):
2528 s = socket.socket()
2529 s.connect(address)
2530 for i in range(10):
2531 if slow:
2532 time.sleep(random.random()*0.1)
2533 s.sendall(('%s\n' % i).encode('ascii'))
2534 s.close()
2535
2536 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002537 from multiprocessing.connection import wait
2538 l = socket.socket()
2539 l.bind(('', 0))
2540 l.listen(4)
2541 addr = ('localhost', l.getsockname()[1])
2542 readers = []
2543 procs = []
2544 dic = {}
2545
2546 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002547 p = multiprocessing.Process(target=self._child_test_wait_socket,
2548 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002549 p.daemon = True
2550 p.start()
2551 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002552 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002553
2554 for i in range(4):
2555 r, _ = l.accept()
2556 readers.append(r)
2557 dic[r] = []
2558 l.close()
2559
2560 while readers:
2561 for r in wait(readers):
2562 msg = r.recv(32)
2563 if not msg:
2564 readers.remove(r)
2565 r.close()
2566 else:
2567 dic[r].append(msg)
2568
2569 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
2570 for v in dic.values():
2571 self.assertEqual(b''.join(v), expected)
2572
2573 def test_wait_slow(self):
2574 self.test_wait(True)
2575
2576 def test_wait_socket_slow(self):
2577 self.test_wait(True)
2578
2579 def test_wait_timeout(self):
2580 from multiprocessing.connection import wait
2581
2582 expected = 1
2583 a, b = multiprocessing.Pipe()
2584
2585 start = time.time()
2586 res = wait([a, b], 1)
2587 delta = time.time() - start
2588
2589 self.assertEqual(res, [])
Antoine Pitrou37749772012-03-09 18:40:15 +01002590 self.assertLess(delta, expected + 0.5)
2591 self.assertGreater(delta, expected - 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002592
2593 b.send(None)
2594
2595 start = time.time()
2596 res = wait([a, b], 1)
2597 delta = time.time() - start
2598
2599 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01002600 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002601
2602 def test_wait_integer(self):
2603 from multiprocessing.connection import wait
2604
2605 expected = 5
2606 a, b = multiprocessing.Pipe()
2607 p = multiprocessing.Process(target=time.sleep, args=(expected,))
2608
2609 p.start()
2610 self.assertIsInstance(p.sentinel, int)
2611
2612 start = time.time()
2613 res = wait([a, p.sentinel, b], expected + 20)
2614 delta = time.time() - start
2615
2616 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01002617 self.assertLess(delta, expected + 2)
2618 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002619
2620 a.send(None)
2621
2622 start = time.time()
2623 res = wait([a, p.sentinel, b], 20)
2624 delta = time.time() - start
2625
2626 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002627 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002628
2629 b.send(None)
2630
2631 start = time.time()
2632 res = wait([a, p.sentinel, b], 20)
2633 delta = time.time() - start
2634
2635 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002636 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002637
2638 p.join()
2639
2640
Antoine Pitrou709176f2012-04-01 17:19:09 +02002641#
2642# Issue 14151: Test invalid family on invalid environment
2643#
2644
2645class TestInvalidFamily(unittest.TestCase):
2646
2647 @unittest.skipIf(WIN32, "skipped on Windows")
2648 def test_invalid_family(self):
2649 with self.assertRaises(ValueError):
2650 multiprocessing.connection.Listener(r'\\.\test')
2651
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002652 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2653 def test_invalid_family_win32(self):
2654 with self.assertRaises(ValueError):
2655 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002656
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002657testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002658 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002659
Benjamin Petersone711caf2008-06-11 16:44:04 +00002660#
2661#
2662#
2663
2664def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002665 if sys.platform.startswith("linux"):
2666 try:
2667 lock = multiprocessing.RLock()
2668 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002669 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002670
Charles-François Natali221ef672011-11-22 18:55:22 +01002671 check_enough_semaphores()
2672
Benjamin Petersone711caf2008-06-11 16:44:04 +00002673 if run is None:
2674 from test.support import run_unittest as run
2675
2676 util.get_temp_dir() # creates temp directory for use by all processes
2677
2678 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2679
Benjamin Peterson41181742008-07-02 20:22:54 +00002680 ProcessesMixin.pool = multiprocessing.Pool(4)
2681 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2682 ManagerMixin.manager.__init__()
2683 ManagerMixin.manager.start()
2684 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002685
2686 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002687 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2688 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002689 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2690 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002691 )
2692
2693 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2694 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2695 run(suite)
2696
Benjamin Peterson41181742008-07-02 20:22:54 +00002697 ThreadsMixin.pool.terminate()
2698 ProcessesMixin.pool.terminate()
2699 ManagerMixin.pool.terminate()
2700 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002701
Benjamin Peterson41181742008-07-02 20:22:54 +00002702 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002703
2704def main():
2705 test_main(unittest.TextTestRunner(verbosity=2).run)
2706
2707if __name__ == '__main__':
2708 main()