blob: f141bd4e48303b0cb65c1e2a7e9456ce34f137cd [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086if WIN32:
87 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
88
89 def wait_for_handle(handle, timeout):
90 if timeout is None or timeout < 0.0:
91 timeout = INFINITE
92 else:
93 timeout = int(1000 * timeout)
94 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
95else:
96 from select import select
97 _select = util._eintr_retry(select)
98
99 def wait_for_handle(handle, timeout):
100 if timeout is not None and timeout < 0.0:
101 timeout = None
102 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000103
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200104try:
105 MAXFD = os.sysconf("SC_OPEN_MAX")
106except:
107 MAXFD = 256
108
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000110# Some tests require ctypes
111#
112
113try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000114 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000115except ImportError:
116 Structure = object
117 c_int = c_double = None
118
Charles-François Natali221ef672011-11-22 18:55:22 +0100119
120def check_enough_semaphores():
121 """Check that the system supports enough semaphores to run the test."""
122 # minimum number of semaphores available according to POSIX
123 nsems_min = 256
124 try:
125 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
126 except (AttributeError, ValueError):
127 # sysconf not available or setting not available
128 return
129 if nsems == -1 or nsems >= nsems_min:
130 return
131 raise unittest.SkipTest("The OS doesn't support enough semaphores "
132 "to run the test (required: %d)." % nsems_min)
133
134
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000135#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136# Creates a wrapper for a function which records the time it takes to finish
137#
138
139class TimingWrapper(object):
140
141 def __init__(self, func):
142 self.func = func
143 self.elapsed = None
144
145 def __call__(self, *args, **kwds):
146 t = time.time()
147 try:
148 return self.func(*args, **kwds)
149 finally:
150 self.elapsed = time.time() - t
151
152#
153# Base class for test cases
154#
155
156class BaseTestCase(object):
157
158 ALLOWED_TYPES = ('processes', 'manager', 'threads')
159
160 def assertTimingAlmostEqual(self, a, b):
161 if CHECK_TIMINGS:
162 self.assertAlmostEqual(a, b, 1)
163
164 def assertReturnsIfImplemented(self, value, func, *args):
165 try:
166 res = func(*args)
167 except NotImplementedError:
168 pass
169 else:
170 return self.assertEqual(value, res)
171
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 # For the sanity of Windows users, rather than crashing or freezing in
173 # multiple ways.
174 def __reduce__(self, *args):
175 raise NotImplementedError("shouldn't try to pickle a test case")
176
177 __reduce_ex__ = __reduce__
178
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179#
180# Return the value of a semaphore
181#
182
183def get_value(self):
184 try:
185 return self.get_value()
186 except AttributeError:
187 try:
188 return self._Semaphore__value
189 except AttributeError:
190 try:
191 return self._value
192 except AttributeError:
193 raise NotImplementedError
194
195#
196# Testcases
197#
198
199class _TestProcess(BaseTestCase):
200
201 ALLOWED_TYPES = ('processes', 'threads')
202
203 def test_current(self):
204 if self.TYPE == 'threads':
205 return
206
207 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000212 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 self.assertEqual(current.ident, os.getpid())
215 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 def test_daemon_argument(self):
218 if self.TYPE == "threads":
219 return
220
221 # By default uses the current process's daemon flag.
222 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000223 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000224 proc1 = self.Process(target=self._test, daemon=True)
225 self.assertTrue(proc1.daemon)
226 proc2 = self.Process(target=self._test, daemon=False)
227 self.assertFalse(proc2.daemon)
228
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 @classmethod
230 def _test(cls, q, *args, **kwds):
231 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 q.put(args)
233 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000235 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000236 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 q.put(current.pid)
238
239 def test_process(self):
240 q = self.Queue(1)
241 e = self.Event()
242 args = (q, 1, 2)
243 kwargs = {'hello':23, 'bye':2.54}
244 name = 'SomeProcess'
245 p = self.Process(
246 target=self._test, args=args, kwargs=kwargs, name=name
247 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 current = self.current_process()
250
251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.authkey, current.authkey)
253 self.assertEqual(p.is_alive(), False)
254 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
260
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(p.exitcode, None)
262 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(q.get(), args[1:])
266 self.assertEqual(q.get(), kwargs)
267 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(q.get(), current.authkey)
270 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
272 p.join()
273
Ezio Melottib3aedd42010-11-20 19:04:17 +0000274 self.assertEqual(p.exitcode, 0)
275 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000276 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000278 @classmethod
279 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 time.sleep(1000)
281
282 def test_terminate(self):
283 if self.TYPE == 'threads':
284 return
285
286 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000287 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288 p.start()
289
290 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000291 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000292 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293
294 p.terminate()
295
296 join = TimingWrapper(p.join)
297 self.assertEqual(join(), None)
298 self.assertTimingAlmostEqual(join.elapsed, 0.0)
299
300 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
304
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000305 # XXX sometimes get p.exitcode == 0 on Windows ...
306 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307
308 def test_cpu_count(self):
309 try:
310 cpus = multiprocessing.cpu_count()
311 except NotImplementedError:
312 cpus = 1
313 self.assertTrue(type(cpus) is int)
314 self.assertTrue(cpus >= 1)
315
316 def test_active_children(self):
317 self.assertEqual(type(self.active_children()), list)
318
319 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000320 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
Jesus Cea94f964f2011-09-09 20:26:57 +0200322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000324 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000327 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000329 @classmethod
330 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 from multiprocessing import forking
332 wconn.send(id)
333 if len(id) < 2:
334 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000335 p = cls.Process(
336 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 )
338 p.start()
339 p.join()
340
341 def test_recursion(self):
342 rconn, wconn = self.Pipe(duplex=False)
343 self._test_recursion(wconn, [])
344
345 time.sleep(DELTA)
346 result = []
347 while rconn.poll():
348 result.append(rconn.recv())
349
350 expected = [
351 [],
352 [0],
353 [0, 0],
354 [0, 1],
355 [1],
356 [1, 0],
357 [1, 1]
358 ]
359 self.assertEqual(result, expected)
360
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200361 @classmethod
362 def _test_sentinel(cls, event):
363 event.wait(10.0)
364
365 def test_sentinel(self):
366 if self.TYPE == "threads":
367 return
368 event = self.Event()
369 p = self.Process(target=self._test_sentinel, args=(event,))
370 with self.assertRaises(ValueError):
371 p.sentinel
372 p.start()
373 self.addCleanup(p.join)
374 sentinel = p.sentinel
375 self.assertIsInstance(sentinel, int)
376 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
377 event.set()
378 p.join()
379 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
380
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381#
382#
383#
384
385class _UpperCaser(multiprocessing.Process):
386
387 def __init__(self):
388 multiprocessing.Process.__init__(self)
389 self.child_conn, self.parent_conn = multiprocessing.Pipe()
390
391 def run(self):
392 self.parent_conn.close()
393 for s in iter(self.child_conn.recv, None):
394 self.child_conn.send(s.upper())
395 self.child_conn.close()
396
397 def submit(self, s):
398 assert type(s) is str
399 self.parent_conn.send(s)
400 return self.parent_conn.recv()
401
402 def stop(self):
403 self.parent_conn.send(None)
404 self.parent_conn.close()
405 self.child_conn.close()
406
407class _TestSubclassingProcess(BaseTestCase):
408
409 ALLOWED_TYPES = ('processes',)
410
411 def test_subclassing(self):
412 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200413 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 uppercaser.start()
415 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
416 self.assertEqual(uppercaser.submit('world'), 'WORLD')
417 uppercaser.stop()
418 uppercaser.join()
419
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100420 def test_stderr_flush(self):
421 # sys.stderr is flushed at process shutdown (issue #13812)
422 if self.TYPE == "threads":
423 return
424
425 testfn = test.support.TESTFN
426 self.addCleanup(test.support.unlink, testfn)
427 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
428 proc.start()
429 proc.join()
430 with open(testfn, 'r') as f:
431 err = f.read()
432 # The whole traceback was printed
433 self.assertIn("ZeroDivisionError", err)
434 self.assertIn("test_multiprocessing.py", err)
435 self.assertIn("1/0 # MARKER", err)
436
437 @classmethod
438 def _test_stderr_flush(cls, testfn):
439 sys.stderr = open(testfn, 'w')
440 1/0 # MARKER
441
442
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443#
444#
445#
446
447def queue_empty(q):
448 if hasattr(q, 'empty'):
449 return q.empty()
450 else:
451 return q.qsize() == 0
452
453def queue_full(q, maxsize):
454 if hasattr(q, 'full'):
455 return q.full()
456 else:
457 return q.qsize() == maxsize
458
459
460class _TestQueue(BaseTestCase):
461
462
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000463 @classmethod
464 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000465 child_can_start.wait()
466 for i in range(6):
467 queue.get()
468 parent_can_continue.set()
469
470 def test_put(self):
471 MAXSIZE = 6
472 queue = self.Queue(maxsize=MAXSIZE)
473 child_can_start = self.Event()
474 parent_can_continue = self.Event()
475
476 proc = self.Process(
477 target=self._test_put,
478 args=(queue, child_can_start, parent_can_continue)
479 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000480 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000481 proc.start()
482
483 self.assertEqual(queue_empty(queue), True)
484 self.assertEqual(queue_full(queue, MAXSIZE), False)
485
486 queue.put(1)
487 queue.put(2, True)
488 queue.put(3, True, None)
489 queue.put(4, False)
490 queue.put(5, False, None)
491 queue.put_nowait(6)
492
493 # the values may be in buffer but not yet in pipe so sleep a bit
494 time.sleep(DELTA)
495
496 self.assertEqual(queue_empty(queue), False)
497 self.assertEqual(queue_full(queue, MAXSIZE), True)
498
499 put = TimingWrapper(queue.put)
500 put_nowait = TimingWrapper(queue.put_nowait)
501
502 self.assertRaises(pyqueue.Full, put, 7, False)
503 self.assertTimingAlmostEqual(put.elapsed, 0)
504
505 self.assertRaises(pyqueue.Full, put, 7, False, None)
506 self.assertTimingAlmostEqual(put.elapsed, 0)
507
508 self.assertRaises(pyqueue.Full, put_nowait, 7)
509 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
510
511 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
512 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
513
514 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
515 self.assertTimingAlmostEqual(put.elapsed, 0)
516
517 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
518 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
519
520 child_can_start.set()
521 parent_can_continue.wait()
522
523 self.assertEqual(queue_empty(queue), True)
524 self.assertEqual(queue_full(queue, MAXSIZE), False)
525
526 proc.join()
527
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000528 @classmethod
529 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000531 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 queue.put(2)
533 queue.put(3)
534 queue.put(4)
535 queue.put(5)
536 parent_can_continue.set()
537
538 def test_get(self):
539 queue = self.Queue()
540 child_can_start = self.Event()
541 parent_can_continue = self.Event()
542
543 proc = self.Process(
544 target=self._test_get,
545 args=(queue, child_can_start, parent_can_continue)
546 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000547 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000548 proc.start()
549
550 self.assertEqual(queue_empty(queue), True)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 time.sleep(DELTA)
556 self.assertEqual(queue_empty(queue), False)
557
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000558 # Hangs unexpectedly, remove for now
559 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 self.assertEqual(queue.get(True, None), 2)
561 self.assertEqual(queue.get(True), 3)
562 self.assertEqual(queue.get(timeout=1), 4)
563 self.assertEqual(queue.get_nowait(), 5)
564
565 self.assertEqual(queue_empty(queue), True)
566
567 get = TimingWrapper(queue.get)
568 get_nowait = TimingWrapper(queue.get_nowait)
569
570 self.assertRaises(pyqueue.Empty, get, False)
571 self.assertTimingAlmostEqual(get.elapsed, 0)
572
573 self.assertRaises(pyqueue.Empty, get, False, None)
574 self.assertTimingAlmostEqual(get.elapsed, 0)
575
576 self.assertRaises(pyqueue.Empty, get_nowait)
577 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
578
579 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
580 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
581
582 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
583 self.assertTimingAlmostEqual(get.elapsed, 0)
584
585 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
586 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
587
588 proc.join()
589
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000590 @classmethod
591 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 for i in range(10, 20):
593 queue.put(i)
594 # note that at this point the items may only be buffered, so the
595 # process cannot shutdown until the feeder thread has finished
596 # pushing items onto the pipe.
597
598 def test_fork(self):
599 # Old versions of Queue would fail to create a new feeder
600 # thread for a forked process if the original process had its
601 # own feeder thread. This test checks that this no longer
602 # happens.
603
604 queue = self.Queue()
605
606 # put items on queue so that main process starts a feeder thread
607 for i in range(10):
608 queue.put(i)
609
610 # wait to make sure thread starts before we fork a new process
611 time.sleep(DELTA)
612
613 # fork process
614 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200615 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000616 p.start()
617
618 # check that all expected items are in the queue
619 for i in range(20):
620 self.assertEqual(queue.get(), i)
621 self.assertRaises(pyqueue.Empty, queue.get, False)
622
623 p.join()
624
625 def test_qsize(self):
626 q = self.Queue()
627 try:
628 self.assertEqual(q.qsize(), 0)
629 except NotImplementedError:
630 return
631 q.put(1)
632 self.assertEqual(q.qsize(), 1)
633 q.put(5)
634 self.assertEqual(q.qsize(), 2)
635 q.get()
636 self.assertEqual(q.qsize(), 1)
637 q.get()
638 self.assertEqual(q.qsize(), 0)
639
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000640 @classmethod
641 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000642 for obj in iter(q.get, None):
643 time.sleep(DELTA)
644 q.task_done()
645
646 def test_task_done(self):
647 queue = self.JoinableQueue()
648
649 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000650 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651
652 workers = [self.Process(target=self._test_task_done, args=(queue,))
653 for i in range(4)]
654
655 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200656 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657 p.start()
658
659 for i in range(10):
660 queue.put(i)
661
662 queue.join()
663
664 for p in workers:
665 queue.put(None)
666
667 for p in workers:
668 p.join()
669
670#
671#
672#
673
674class _TestLock(BaseTestCase):
675
676 def test_lock(self):
677 lock = self.Lock()
678 self.assertEqual(lock.acquire(), True)
679 self.assertEqual(lock.acquire(False), False)
680 self.assertEqual(lock.release(), None)
681 self.assertRaises((ValueError, threading.ThreadError), lock.release)
682
683 def test_rlock(self):
684 lock = self.RLock()
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(), True)
687 self.assertEqual(lock.acquire(), True)
688 self.assertEqual(lock.release(), None)
689 self.assertEqual(lock.release(), None)
690 self.assertEqual(lock.release(), None)
691 self.assertRaises((AssertionError, RuntimeError), lock.release)
692
Jesse Nollerf8d00852009-03-31 03:25:07 +0000693 def test_lock_context(self):
694 with self.Lock():
695 pass
696
Benjamin Petersone711caf2008-06-11 16:44:04 +0000697
698class _TestSemaphore(BaseTestCase):
699
700 def _test_semaphore(self, sem):
701 self.assertReturnsIfImplemented(2, get_value, sem)
702 self.assertEqual(sem.acquire(), True)
703 self.assertReturnsIfImplemented(1, get_value, sem)
704 self.assertEqual(sem.acquire(), True)
705 self.assertReturnsIfImplemented(0, get_value, sem)
706 self.assertEqual(sem.acquire(False), False)
707 self.assertReturnsIfImplemented(0, get_value, sem)
708 self.assertEqual(sem.release(), None)
709 self.assertReturnsIfImplemented(1, get_value, sem)
710 self.assertEqual(sem.release(), None)
711 self.assertReturnsIfImplemented(2, get_value, sem)
712
713 def test_semaphore(self):
714 sem = self.Semaphore(2)
715 self._test_semaphore(sem)
716 self.assertEqual(sem.release(), None)
717 self.assertReturnsIfImplemented(3, get_value, sem)
718 self.assertEqual(sem.release(), None)
719 self.assertReturnsIfImplemented(4, get_value, sem)
720
721 def test_bounded_semaphore(self):
722 sem = self.BoundedSemaphore(2)
723 self._test_semaphore(sem)
724 # Currently fails on OS/X
725 #if HAVE_GETVALUE:
726 # self.assertRaises(ValueError, sem.release)
727 # self.assertReturnsIfImplemented(2, get_value, sem)
728
729 def test_timeout(self):
730 if self.TYPE != 'processes':
731 return
732
733 sem = self.Semaphore(0)
734 acquire = TimingWrapper(sem.acquire)
735
736 self.assertEqual(acquire(False), False)
737 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
738
739 self.assertEqual(acquire(False, None), False)
740 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
741
742 self.assertEqual(acquire(False, TIMEOUT1), False)
743 self.assertTimingAlmostEqual(acquire.elapsed, 0)
744
745 self.assertEqual(acquire(True, TIMEOUT2), False)
746 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
747
748 self.assertEqual(acquire(timeout=TIMEOUT3), False)
749 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
750
751
752class _TestCondition(BaseTestCase):
753
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000754 @classmethod
755 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 cond.acquire()
757 sleeping.release()
758 cond.wait(timeout)
759 woken.release()
760 cond.release()
761
762 def check_invariant(self, cond):
763 # this is only supposed to succeed when there are no sleepers
764 if self.TYPE == 'processes':
765 try:
766 sleepers = (cond._sleeping_count.get_value() -
767 cond._woken_count.get_value())
768 self.assertEqual(sleepers, 0)
769 self.assertEqual(cond._wait_semaphore.get_value(), 0)
770 except NotImplementedError:
771 pass
772
773 def test_notify(self):
774 cond = self.Condition()
775 sleeping = self.Semaphore(0)
776 woken = self.Semaphore(0)
777
778 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000779 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 p.start()
781
782 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000783 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 p.start()
785
786 # wait for both children to start sleeping
787 sleeping.acquire()
788 sleeping.acquire()
789
790 # check no process/thread has woken up
791 time.sleep(DELTA)
792 self.assertReturnsIfImplemented(0, get_value, woken)
793
794 # wake up one process/thread
795 cond.acquire()
796 cond.notify()
797 cond.release()
798
799 # check one process/thread has woken up
800 time.sleep(DELTA)
801 self.assertReturnsIfImplemented(1, get_value, woken)
802
803 # wake up another
804 cond.acquire()
805 cond.notify()
806 cond.release()
807
808 # check other has woken up
809 time.sleep(DELTA)
810 self.assertReturnsIfImplemented(2, get_value, woken)
811
812 # check state is not mucked up
813 self.check_invariant(cond)
814 p.join()
815
816 def test_notify_all(self):
817 cond = self.Condition()
818 sleeping = self.Semaphore(0)
819 woken = self.Semaphore(0)
820
821 # start some threads/processes which will timeout
822 for i in range(3):
823 p = self.Process(target=self.f,
824 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000825 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 p.start()
827
828 t = threading.Thread(target=self.f,
829 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000830 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 t.start()
832
833 # wait for them all to sleep
834 for i in range(6):
835 sleeping.acquire()
836
837 # check they have all timed out
838 for i in range(6):
839 woken.acquire()
840 self.assertReturnsIfImplemented(0, get_value, woken)
841
842 # check state is not mucked up
843 self.check_invariant(cond)
844
845 # start some more threads/processes
846 for i in range(3):
847 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000848 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 p.start()
850
851 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000852 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 t.start()
854
855 # wait for them to all sleep
856 for i in range(6):
857 sleeping.acquire()
858
859 # check no process/thread has woken up
860 time.sleep(DELTA)
861 self.assertReturnsIfImplemented(0, get_value, woken)
862
863 # wake them all up
864 cond.acquire()
865 cond.notify_all()
866 cond.release()
867
868 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200869 for i in range(10):
870 try:
871 if get_value(woken) == 6:
872 break
873 except NotImplementedError:
874 break
875 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 self.assertReturnsIfImplemented(6, get_value, woken)
877
878 # check state is not mucked up
879 self.check_invariant(cond)
880
881 def test_timeout(self):
882 cond = self.Condition()
883 wait = TimingWrapper(cond.wait)
884 cond.acquire()
885 res = wait(TIMEOUT1)
886 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000887 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
889
890
891class _TestEvent(BaseTestCase):
892
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000893 @classmethod
894 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 time.sleep(TIMEOUT2)
896 event.set()
897
898 def test_event(self):
899 event = self.Event()
900 wait = TimingWrapper(event.wait)
901
Ezio Melotti13925002011-03-16 11:05:33 +0200902 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000904 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905
Benjamin Peterson965ce872009-04-05 21:24:58 +0000906 # Removed, threading.Event.wait() will return the value of the __flag
907 # instead of None. API Shear with the semaphore backed mp.Event
908 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000909 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000910 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000911 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
912
913 event.set()
914
915 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000916 self.assertEqual(event.is_set(), True)
917 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000919 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
921 # self.assertEqual(event.is_set(), True)
922
923 event.clear()
924
925 #self.assertEqual(event.is_set(), False)
926
Jesus Cea94f964f2011-09-09 20:26:57 +0200927 p = self.Process(target=self._test_event, args=(event,))
928 p.daemon = True
929 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000930 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931
932#
933#
934#
935
936class _TestValue(BaseTestCase):
937
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000938 ALLOWED_TYPES = ('processes',)
939
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 codes_values = [
941 ('i', 4343, 24234),
942 ('d', 3.625, -4.25),
943 ('h', -232, 234),
944 ('c', latin('x'), latin('y'))
945 ]
946
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000947 def setUp(self):
948 if not HAS_SHAREDCTYPES:
949 self.skipTest("requires multiprocessing.sharedctypes")
950
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000951 @classmethod
952 def _test(cls, values):
953 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954 sv.value = cv[2]
955
956
957 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000958 if raw:
959 values = [self.RawValue(code, value)
960 for code, value, _ in self.codes_values]
961 else:
962 values = [self.Value(code, value)
963 for code, value, _ in self.codes_values]
964
965 for sv, cv in zip(values, self.codes_values):
966 self.assertEqual(sv.value, cv[1])
967
968 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200969 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 proc.start()
971 proc.join()
972
973 for sv, cv in zip(values, self.codes_values):
974 self.assertEqual(sv.value, cv[2])
975
976 def test_rawvalue(self):
977 self.test_value(raw=True)
978
979 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980 val1 = self.Value('i', 5)
981 lock1 = val1.get_lock()
982 obj1 = val1.get_obj()
983
984 val2 = self.Value('i', 5, lock=None)
985 lock2 = val2.get_lock()
986 obj2 = val2.get_obj()
987
988 lock = self.Lock()
989 val3 = self.Value('i', 5, lock=lock)
990 lock3 = val3.get_lock()
991 obj3 = val3.get_obj()
992 self.assertEqual(lock, lock3)
993
Jesse Nollerb0516a62009-01-18 03:11:38 +0000994 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 self.assertFalse(hasattr(arr4, 'get_lock'))
996 self.assertFalse(hasattr(arr4, 'get_obj'))
997
Jesse Nollerb0516a62009-01-18 03:11:38 +0000998 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
999
1000 arr5 = self.RawValue('i', 5)
1001 self.assertFalse(hasattr(arr5, 'get_lock'))
1002 self.assertFalse(hasattr(arr5, 'get_obj'))
1003
Benjamin Petersone711caf2008-06-11 16:44:04 +00001004
1005class _TestArray(BaseTestCase):
1006
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001007 ALLOWED_TYPES = ('processes',)
1008
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001009 @classmethod
1010 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011 for i in range(1, len(seq)):
1012 seq[i] += seq[i-1]
1013
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001016 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1017 if raw:
1018 arr = self.RawArray('i', seq)
1019 else:
1020 arr = self.Array('i', seq)
1021
1022 self.assertEqual(len(arr), len(seq))
1023 self.assertEqual(arr[3], seq[3])
1024 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1025
1026 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1027
1028 self.assertEqual(list(arr[:]), seq)
1029
1030 self.f(seq)
1031
1032 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001033 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001034 p.start()
1035 p.join()
1036
1037 self.assertEqual(list(arr[:]), seq)
1038
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001039 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001040 def test_array_from_size(self):
1041 size = 10
1042 # Test for zeroing (see issue #11675).
1043 # The repetition below strengthens the test by increasing the chances
1044 # of previously allocated non-zero memory being used for the new array
1045 # on the 2nd and 3rd loops.
1046 for _ in range(3):
1047 arr = self.Array('i', size)
1048 self.assertEqual(len(arr), size)
1049 self.assertEqual(list(arr), [0] * size)
1050 arr[:] = range(10)
1051 self.assertEqual(list(arr), list(range(10)))
1052 del arr
1053
1054 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001055 def test_rawarray(self):
1056 self.test_array(raw=True)
1057
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001058 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060 arr1 = self.Array('i', list(range(10)))
1061 lock1 = arr1.get_lock()
1062 obj1 = arr1.get_obj()
1063
1064 arr2 = self.Array('i', list(range(10)), lock=None)
1065 lock2 = arr2.get_lock()
1066 obj2 = arr2.get_obj()
1067
1068 lock = self.Lock()
1069 arr3 = self.Array('i', list(range(10)), lock=lock)
1070 lock3 = arr3.get_lock()
1071 obj3 = arr3.get_obj()
1072 self.assertEqual(lock, lock3)
1073
Jesse Nollerb0516a62009-01-18 03:11:38 +00001074 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001075 self.assertFalse(hasattr(arr4, 'get_lock'))
1076 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001077 self.assertRaises(AttributeError,
1078 self.Array, 'i', range(10), lock='notalock')
1079
1080 arr5 = self.RawArray('i', range(10))
1081 self.assertFalse(hasattr(arr5, 'get_lock'))
1082 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083
1084#
1085#
1086#
1087
1088class _TestContainers(BaseTestCase):
1089
1090 ALLOWED_TYPES = ('manager',)
1091
1092 def test_list(self):
1093 a = self.list(list(range(10)))
1094 self.assertEqual(a[:], list(range(10)))
1095
1096 b = self.list()
1097 self.assertEqual(b[:], [])
1098
1099 b.extend(list(range(5)))
1100 self.assertEqual(b[:], list(range(5)))
1101
1102 self.assertEqual(b[2], 2)
1103 self.assertEqual(b[2:10], [2,3,4])
1104
1105 b *= 2
1106 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1107
1108 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1109
1110 self.assertEqual(a[:], list(range(10)))
1111
1112 d = [a, b]
1113 e = self.list(d)
1114 self.assertEqual(
1115 e[:],
1116 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1117 )
1118
1119 f = self.list([a])
1120 a.append('hello')
1121 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1122
1123 def test_dict(self):
1124 d = self.dict()
1125 indices = list(range(65, 70))
1126 for i in indices:
1127 d[i] = chr(i)
1128 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1129 self.assertEqual(sorted(d.keys()), indices)
1130 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1131 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1132
1133 def test_namespace(self):
1134 n = self.Namespace()
1135 n.name = 'Bob'
1136 n.job = 'Builder'
1137 n._hidden = 'hidden'
1138 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1139 del n.job
1140 self.assertEqual(str(n), "Namespace(name='Bob')")
1141 self.assertTrue(hasattr(n, 'name'))
1142 self.assertTrue(not hasattr(n, 'job'))
1143
1144#
1145#
1146#
1147
1148def sqr(x, wait=0.0):
1149 time.sleep(wait)
1150 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001151
Antoine Pitroude911b22011-12-21 11:03:24 +01001152def mul(x, y):
1153 return x*y
1154
Benjamin Petersone711caf2008-06-11 16:44:04 +00001155class _TestPool(BaseTestCase):
1156
1157 def test_apply(self):
1158 papply = self.pool.apply
1159 self.assertEqual(papply(sqr, (5,)), sqr(5))
1160 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1161
1162 def test_map(self):
1163 pmap = self.pool.map
1164 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1165 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1166 list(map(sqr, list(range(100)))))
1167
Antoine Pitroude911b22011-12-21 11:03:24 +01001168 def test_starmap(self):
1169 psmap = self.pool.starmap
1170 tuples = list(zip(range(10), range(9,-1, -1)))
1171 self.assertEqual(psmap(mul, tuples),
1172 list(itertools.starmap(mul, tuples)))
1173 tuples = list(zip(range(100), range(99,-1, -1)))
1174 self.assertEqual(psmap(mul, tuples, chunksize=20),
1175 list(itertools.starmap(mul, tuples)))
1176
1177 def test_starmap_async(self):
1178 tuples = list(zip(range(100), range(99,-1, -1)))
1179 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1180 list(itertools.starmap(mul, tuples)))
1181
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001182 def test_map_chunksize(self):
1183 try:
1184 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1185 except multiprocessing.TimeoutError:
1186 self.fail("pool.map_async with chunksize stalled on null list")
1187
Benjamin Petersone711caf2008-06-11 16:44:04 +00001188 def test_async(self):
1189 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1190 get = TimingWrapper(res.get)
1191 self.assertEqual(get(), 49)
1192 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1193
1194 def test_async_timeout(self):
1195 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1196 get = TimingWrapper(res.get)
1197 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1198 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1199
1200 def test_imap(self):
1201 it = self.pool.imap(sqr, list(range(10)))
1202 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1203
1204 it = self.pool.imap(sqr, list(range(10)))
1205 for i in range(10):
1206 self.assertEqual(next(it), i*i)
1207 self.assertRaises(StopIteration, it.__next__)
1208
1209 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1210 for i in range(1000):
1211 self.assertEqual(next(it), i*i)
1212 self.assertRaises(StopIteration, it.__next__)
1213
1214 def test_imap_unordered(self):
1215 it = self.pool.imap_unordered(sqr, list(range(1000)))
1216 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1217
1218 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1219 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1220
1221 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001222 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1223 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1224
Benjamin Petersone711caf2008-06-11 16:44:04 +00001225 p = multiprocessing.Pool(3)
1226 self.assertEqual(3, len(p._pool))
1227 p.close()
1228 p.join()
1229
1230 def test_terminate(self):
1231 if self.TYPE == 'manager':
1232 # On Unix a forked process increfs each shared object to
1233 # which its parent process held a reference. If the
1234 # forked process gets terminated then there is likely to
1235 # be a reference leak. So to prevent
1236 # _TestZZZNumberOfObjects from failing we skip this test
1237 # when using a manager.
1238 return
1239
1240 result = self.pool.map_async(
1241 time.sleep, [0.1 for i in range(10000)], chunksize=1
1242 )
1243 self.pool.terminate()
1244 join = TimingWrapper(self.pool.join)
1245 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001246 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001247
Ask Solem2afcbf22010-11-09 20:55:52 +00001248def raising():
1249 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001250
Ask Solem2afcbf22010-11-09 20:55:52 +00001251def unpickleable_result():
1252 return lambda: 42
1253
1254class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001255 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001256
1257 def test_async_error_callback(self):
1258 p = multiprocessing.Pool(2)
1259
1260 scratchpad = [None]
1261 def errback(exc):
1262 scratchpad[0] = exc
1263
1264 res = p.apply_async(raising, error_callback=errback)
1265 self.assertRaises(KeyError, res.get)
1266 self.assertTrue(scratchpad[0])
1267 self.assertIsInstance(scratchpad[0], KeyError)
1268
1269 p.close()
1270 p.join()
1271
1272 def test_unpickleable_result(self):
1273 from multiprocessing.pool import MaybeEncodingError
1274 p = multiprocessing.Pool(2)
1275
1276 # Make sure we don't lose pool processes because of encoding errors.
1277 for iteration in range(20):
1278
1279 scratchpad = [None]
1280 def errback(exc):
1281 scratchpad[0] = exc
1282
1283 res = p.apply_async(unpickleable_result, error_callback=errback)
1284 self.assertRaises(MaybeEncodingError, res.get)
1285 wrapped = scratchpad[0]
1286 self.assertTrue(wrapped)
1287 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1288 self.assertIsNotNone(wrapped.exc)
1289 self.assertIsNotNone(wrapped.value)
1290
1291 p.close()
1292 p.join()
1293
1294class _TestPoolWorkerLifetime(BaseTestCase):
1295 ALLOWED_TYPES = ('processes', )
1296
Jesse Noller1f0b6582010-01-27 03:36:01 +00001297 def test_pool_worker_lifetime(self):
1298 p = multiprocessing.Pool(3, maxtasksperchild=10)
1299 self.assertEqual(3, len(p._pool))
1300 origworkerpids = [w.pid for w in p._pool]
1301 # Run many tasks so each worker gets replaced (hopefully)
1302 results = []
1303 for i in range(100):
1304 results.append(p.apply_async(sqr, (i, )))
1305 # Fetch the results and verify we got the right answers,
1306 # also ensuring all the tasks have completed.
1307 for (j, res) in enumerate(results):
1308 self.assertEqual(res.get(), sqr(j))
1309 # Refill the pool
1310 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001311 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001312 # (countdown * DELTA = 5 seconds max startup process time)
1313 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001314 while countdown and not all(w.is_alive() for w in p._pool):
1315 countdown -= 1
1316 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001317 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001318 # All pids should be assigned. See issue #7805.
1319 self.assertNotIn(None, origworkerpids)
1320 self.assertNotIn(None, finalworkerpids)
1321 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001322 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1323 p.close()
1324 p.join()
1325
Charles-François Natalif8859e12011-10-24 18:45:29 +02001326 def test_pool_worker_lifetime_early_close(self):
1327 # Issue #10332: closing a pool whose workers have limited lifetimes
1328 # before all the tasks completed would make join() hang.
1329 p = multiprocessing.Pool(3, maxtasksperchild=1)
1330 results = []
1331 for i in range(6):
1332 results.append(p.apply_async(sqr, (i, 0.3)))
1333 p.close()
1334 p.join()
1335 # check the results
1336 for (j, res) in enumerate(results):
1337 self.assertEqual(res.get(), sqr(j))
1338
1339
Benjamin Petersone711caf2008-06-11 16:44:04 +00001340#
1341# Test that manager has expected number of shared objects left
1342#
1343
1344class _TestZZZNumberOfObjects(BaseTestCase):
1345 # Because test cases are sorted alphabetically, this one will get
1346 # run after all the other tests for the manager. It tests that
1347 # there have been no "reference leaks" for the manager's shared
1348 # objects. Note the comment in _TestPool.test_terminate().
1349 ALLOWED_TYPES = ('manager',)
1350
1351 def test_number_of_objects(self):
1352 EXPECTED_NUMBER = 1 # the pool object is still alive
1353 multiprocessing.active_children() # discard dead process objs
1354 gc.collect() # do garbage collection
1355 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001356 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001357 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001358 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001359 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001360
1361 self.assertEqual(refs, EXPECTED_NUMBER)
1362
1363#
1364# Test of creating a customized manager class
1365#
1366
1367from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1368
1369class FooBar(object):
1370 def f(self):
1371 return 'f()'
1372 def g(self):
1373 raise ValueError
1374 def _h(self):
1375 return '_h()'
1376
1377def baz():
1378 for i in range(10):
1379 yield i*i
1380
1381class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001382 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001383 def __iter__(self):
1384 return self
1385 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001386 return self._callmethod('__next__')
1387
1388class MyManager(BaseManager):
1389 pass
1390
1391MyManager.register('Foo', callable=FooBar)
1392MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1393MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1394
1395
1396class _TestMyManager(BaseTestCase):
1397
1398 ALLOWED_TYPES = ('manager',)
1399
1400 def test_mymanager(self):
1401 manager = MyManager()
1402 manager.start()
1403
1404 foo = manager.Foo()
1405 bar = manager.Bar()
1406 baz = manager.baz()
1407
1408 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1409 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1410
1411 self.assertEqual(foo_methods, ['f', 'g'])
1412 self.assertEqual(bar_methods, ['f', '_h'])
1413
1414 self.assertEqual(foo.f(), 'f()')
1415 self.assertRaises(ValueError, foo.g)
1416 self.assertEqual(foo._callmethod('f'), 'f()')
1417 self.assertRaises(RemoteError, foo._callmethod, '_h')
1418
1419 self.assertEqual(bar.f(), 'f()')
1420 self.assertEqual(bar._h(), '_h()')
1421 self.assertEqual(bar._callmethod('f'), 'f()')
1422 self.assertEqual(bar._callmethod('_h'), '_h()')
1423
1424 self.assertEqual(list(baz), [i*i for i in range(10)])
1425
1426 manager.shutdown()
1427
1428#
1429# Test of connecting to a remote server and using xmlrpclib for serialization
1430#
1431
1432_queue = pyqueue.Queue()
1433def get_queue():
1434 return _queue
1435
1436class QueueManager(BaseManager):
1437 '''manager class used by server process'''
1438QueueManager.register('get_queue', callable=get_queue)
1439
1440class QueueManager2(BaseManager):
1441 '''manager class which specifies the same interface as QueueManager'''
1442QueueManager2.register('get_queue')
1443
1444
1445SERIALIZER = 'xmlrpclib'
1446
1447class _TestRemoteManager(BaseTestCase):
1448
1449 ALLOWED_TYPES = ('manager',)
1450
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001451 @classmethod
1452 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 manager = QueueManager2(
1454 address=address, authkey=authkey, serializer=SERIALIZER
1455 )
1456 manager.connect()
1457 queue = manager.get_queue()
1458 queue.put(('hello world', None, True, 2.25))
1459
1460 def test_remote(self):
1461 authkey = os.urandom(32)
1462
1463 manager = QueueManager(
1464 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1465 )
1466 manager.start()
1467
1468 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001469 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001470 p.start()
1471
1472 manager2 = QueueManager2(
1473 address=manager.address, authkey=authkey, serializer=SERIALIZER
1474 )
1475 manager2.connect()
1476 queue = manager2.get_queue()
1477
1478 # Note that xmlrpclib will deserialize object as a list not a tuple
1479 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1480
1481 # Because we are using xmlrpclib for serialization instead of
1482 # pickle this will cause a serialization error.
1483 self.assertRaises(Exception, queue.put, time.sleep)
1484
1485 # Make queue finalizer run before the server is stopped
1486 del queue
1487 manager.shutdown()
1488
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001489class _TestManagerRestart(BaseTestCase):
1490
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001491 @classmethod
1492 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001493 manager = QueueManager(
1494 address=address, authkey=authkey, serializer=SERIALIZER)
1495 manager.connect()
1496 queue = manager.get_queue()
1497 queue.put('hello world')
1498
1499 def test_rapid_restart(self):
1500 authkey = os.urandom(32)
1501 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001502 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001503 srvr = manager.get_server()
1504 addr = srvr.address
1505 # Close the connection.Listener socket which gets opened as a part
1506 # of manager.get_server(). It's not needed for the test.
1507 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001508 manager.start()
1509
1510 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001511 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001512 p.start()
1513 queue = manager.get_queue()
1514 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001515 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001516 manager.shutdown()
1517 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001518 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001519 try:
1520 manager.start()
1521 except IOError as e:
1522 if e.errno != errno.EADDRINUSE:
1523 raise
1524 # Retry after some time, in case the old socket was lingering
1525 # (sporadic failure on buildbots)
1526 time.sleep(1.0)
1527 manager = QueueManager(
1528 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001529 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001530
Benjamin Petersone711caf2008-06-11 16:44:04 +00001531#
1532#
1533#
1534
1535SENTINEL = latin('')
1536
1537class _TestConnection(BaseTestCase):
1538
1539 ALLOWED_TYPES = ('processes', 'threads')
1540
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001541 @classmethod
1542 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001543 for msg in iter(conn.recv_bytes, SENTINEL):
1544 conn.send_bytes(msg)
1545 conn.close()
1546
1547 def test_connection(self):
1548 conn, child_conn = self.Pipe()
1549
1550 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001551 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001552 p.start()
1553
1554 seq = [1, 2.25, None]
1555 msg = latin('hello world')
1556 longmsg = msg * 10
1557 arr = array.array('i', list(range(4)))
1558
1559 if self.TYPE == 'processes':
1560 self.assertEqual(type(conn.fileno()), int)
1561
1562 self.assertEqual(conn.send(seq), None)
1563 self.assertEqual(conn.recv(), seq)
1564
1565 self.assertEqual(conn.send_bytes(msg), None)
1566 self.assertEqual(conn.recv_bytes(), msg)
1567
1568 if self.TYPE == 'processes':
1569 buffer = array.array('i', [0]*10)
1570 expected = list(arr) + [0] * (10 - len(arr))
1571 self.assertEqual(conn.send_bytes(arr), None)
1572 self.assertEqual(conn.recv_bytes_into(buffer),
1573 len(arr) * buffer.itemsize)
1574 self.assertEqual(list(buffer), expected)
1575
1576 buffer = array.array('i', [0]*10)
1577 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1578 self.assertEqual(conn.send_bytes(arr), None)
1579 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1580 len(arr) * buffer.itemsize)
1581 self.assertEqual(list(buffer), expected)
1582
1583 buffer = bytearray(latin(' ' * 40))
1584 self.assertEqual(conn.send_bytes(longmsg), None)
1585 try:
1586 res = conn.recv_bytes_into(buffer)
1587 except multiprocessing.BufferTooShort as e:
1588 self.assertEqual(e.args, (longmsg,))
1589 else:
1590 self.fail('expected BufferTooShort, got %s' % res)
1591
1592 poll = TimingWrapper(conn.poll)
1593
1594 self.assertEqual(poll(), False)
1595 self.assertTimingAlmostEqual(poll.elapsed, 0)
1596
1597 self.assertEqual(poll(TIMEOUT1), False)
1598 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1599
1600 conn.send(None)
1601
1602 self.assertEqual(poll(TIMEOUT1), True)
1603 self.assertTimingAlmostEqual(poll.elapsed, 0)
1604
1605 self.assertEqual(conn.recv(), None)
1606
1607 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1608 conn.send_bytes(really_big_msg)
1609 self.assertEqual(conn.recv_bytes(), really_big_msg)
1610
1611 conn.send_bytes(SENTINEL) # tell child to quit
1612 child_conn.close()
1613
1614 if self.TYPE == 'processes':
1615 self.assertEqual(conn.readable, True)
1616 self.assertEqual(conn.writable, True)
1617 self.assertRaises(EOFError, conn.recv)
1618 self.assertRaises(EOFError, conn.recv_bytes)
1619
1620 p.join()
1621
1622 def test_duplex_false(self):
1623 reader, writer = self.Pipe(duplex=False)
1624 self.assertEqual(writer.send(1), None)
1625 self.assertEqual(reader.recv(), 1)
1626 if self.TYPE == 'processes':
1627 self.assertEqual(reader.readable, True)
1628 self.assertEqual(reader.writable, False)
1629 self.assertEqual(writer.readable, False)
1630 self.assertEqual(writer.writable, True)
1631 self.assertRaises(IOError, reader.send, 2)
1632 self.assertRaises(IOError, writer.recv)
1633 self.assertRaises(IOError, writer.poll)
1634
1635 def test_spawn_close(self):
1636 # We test that a pipe connection can be closed by parent
1637 # process immediately after child is spawned. On Windows this
1638 # would have sometimes failed on old versions because
1639 # child_conn would be closed before the child got a chance to
1640 # duplicate it.
1641 conn, child_conn = self.Pipe()
1642
1643 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001644 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001645 p.start()
1646 child_conn.close() # this might complete before child initializes
1647
1648 msg = latin('hello')
1649 conn.send_bytes(msg)
1650 self.assertEqual(conn.recv_bytes(), msg)
1651
1652 conn.send_bytes(SENTINEL)
1653 conn.close()
1654 p.join()
1655
1656 def test_sendbytes(self):
1657 if self.TYPE != 'processes':
1658 return
1659
1660 msg = latin('abcdefghijklmnopqrstuvwxyz')
1661 a, b = self.Pipe()
1662
1663 a.send_bytes(msg)
1664 self.assertEqual(b.recv_bytes(), msg)
1665
1666 a.send_bytes(msg, 5)
1667 self.assertEqual(b.recv_bytes(), msg[5:])
1668
1669 a.send_bytes(msg, 7, 8)
1670 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1671
1672 a.send_bytes(msg, 26)
1673 self.assertEqual(b.recv_bytes(), latin(''))
1674
1675 a.send_bytes(msg, 26, 0)
1676 self.assertEqual(b.recv_bytes(), latin(''))
1677
1678 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1679
1680 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1681
1682 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1683
1684 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1685
1686 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1687
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001688 @classmethod
1689 def _is_fd_assigned(cls, fd):
1690 try:
1691 os.fstat(fd)
1692 except OSError as e:
1693 if e.errno == errno.EBADF:
1694 return False
1695 raise
1696 else:
1697 return True
1698
1699 @classmethod
1700 def _writefd(cls, conn, data, create_dummy_fds=False):
1701 if create_dummy_fds:
1702 for i in range(0, 256):
1703 if not cls._is_fd_assigned(i):
1704 os.dup2(conn.fileno(), i)
1705 fd = reduction.recv_handle(conn)
1706 if msvcrt:
1707 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1708 os.write(fd, data)
1709 os.close(fd)
1710
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001711 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001712 def test_fd_transfer(self):
1713 if self.TYPE != 'processes':
1714 self.skipTest("only makes sense with processes")
1715 conn, child_conn = self.Pipe(duplex=True)
1716
1717 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001718 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001719 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001720 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001721 with open(test.support.TESTFN, "wb") as f:
1722 fd = f.fileno()
1723 if msvcrt:
1724 fd = msvcrt.get_osfhandle(fd)
1725 reduction.send_handle(conn, fd, p.pid)
1726 p.join()
1727 with open(test.support.TESTFN, "rb") as f:
1728 self.assertEqual(f.read(), b"foo")
1729
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001730 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001731 @unittest.skipIf(sys.platform == "win32",
1732 "test semantics don't make sense on Windows")
1733 @unittest.skipIf(MAXFD <= 256,
1734 "largest assignable fd number is too small")
1735 @unittest.skipUnless(hasattr(os, "dup2"),
1736 "test needs os.dup2()")
1737 def test_large_fd_transfer(self):
1738 # With fd > 256 (issue #11657)
1739 if self.TYPE != 'processes':
1740 self.skipTest("only makes sense with processes")
1741 conn, child_conn = self.Pipe(duplex=True)
1742
1743 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001744 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001745 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001746 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001747 with open(test.support.TESTFN, "wb") as f:
1748 fd = f.fileno()
1749 for newfd in range(256, MAXFD):
1750 if not self._is_fd_assigned(newfd):
1751 break
1752 else:
1753 self.fail("could not find an unassigned large file descriptor")
1754 os.dup2(fd, newfd)
1755 try:
1756 reduction.send_handle(conn, newfd, p.pid)
1757 finally:
1758 os.close(newfd)
1759 p.join()
1760 with open(test.support.TESTFN, "rb") as f:
1761 self.assertEqual(f.read(), b"bar")
1762
Jesus Cea4507e642011-09-21 03:53:25 +02001763 @classmethod
1764 def _send_data_without_fd(self, conn):
1765 os.write(conn.fileno(), b"\0")
1766
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001767 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001768 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1769 def test_missing_fd_transfer(self):
1770 # Check that exception is raised when received data is not
1771 # accompanied by a file descriptor in ancillary data.
1772 if self.TYPE != 'processes':
1773 self.skipTest("only makes sense with processes")
1774 conn, child_conn = self.Pipe(duplex=True)
1775
1776 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1777 p.daemon = True
1778 p.start()
1779 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1780 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001781
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001782class _TestListener(BaseTestCase):
1783
1784 ALLOWED_TYPES = ('processes')
1785
1786 def test_multiple_bind(self):
1787 for family in self.connection.families:
1788 l = self.connection.Listener(family=family)
1789 self.addCleanup(l.close)
1790 self.assertRaises(OSError, self.connection.Listener,
1791 l.address, family)
1792
Benjamin Petersone711caf2008-06-11 16:44:04 +00001793class _TestListenerClient(BaseTestCase):
1794
1795 ALLOWED_TYPES = ('processes', 'threads')
1796
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001797 @classmethod
1798 def _test(cls, address):
1799 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001800 conn.send('hello')
1801 conn.close()
1802
1803 def test_listener_client(self):
1804 for family in self.connection.families:
1805 l = self.connection.Listener(family=family)
1806 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001807 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001808 p.start()
1809 conn = l.accept()
1810 self.assertEqual(conn.recv(), 'hello')
1811 p.join()
1812 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001813
Benjamin Petersone711caf2008-06-11 16:44:04 +00001814#
1815# Test of sending connection and socket objects between processes
1816#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001817"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001818class _TestPicklingConnections(BaseTestCase):
1819
1820 ALLOWED_TYPES = ('processes',)
1821
1822 def _listener(self, conn, families):
1823 for fam in families:
1824 l = self.connection.Listener(family=fam)
1825 conn.send(l.address)
1826 new_conn = l.accept()
1827 conn.send(new_conn)
1828
1829 if self.TYPE == 'processes':
1830 l = socket.socket()
1831 l.bind(('localhost', 0))
1832 conn.send(l.getsockname())
1833 l.listen(1)
1834 new_conn, addr = l.accept()
1835 conn.send(new_conn)
1836
1837 conn.recv()
1838
1839 def _remote(self, conn):
1840 for (address, msg) in iter(conn.recv, None):
1841 client = self.connection.Client(address)
1842 client.send(msg.upper())
1843 client.close()
1844
1845 if self.TYPE == 'processes':
1846 address, msg = conn.recv()
1847 client = socket.socket()
1848 client.connect(address)
1849 client.sendall(msg.upper())
1850 client.close()
1851
1852 conn.close()
1853
1854 def test_pickling(self):
1855 try:
1856 multiprocessing.allow_connection_pickling()
1857 except ImportError:
1858 return
1859
1860 families = self.connection.families
1861
1862 lconn, lconn0 = self.Pipe()
1863 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001864 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865 lp.start()
1866 lconn0.close()
1867
1868 rconn, rconn0 = self.Pipe()
1869 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001870 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001871 rp.start()
1872 rconn0.close()
1873
1874 for fam in families:
1875 msg = ('This connection uses family %s' % fam).encode('ascii')
1876 address = lconn.recv()
1877 rconn.send((address, msg))
1878 new_conn = lconn.recv()
1879 self.assertEqual(new_conn.recv(), msg.upper())
1880
1881 rconn.send(None)
1882
1883 if self.TYPE == 'processes':
1884 msg = latin('This connection uses a normal socket')
1885 address = lconn.recv()
1886 rconn.send((address, msg))
1887 if hasattr(socket, 'fromfd'):
1888 new_conn = lconn.recv()
1889 self.assertEqual(new_conn.recv(100), msg.upper())
1890 else:
1891 # XXX On Windows with Py2.6 need to backport fromfd()
1892 discard = lconn.recv_bytes()
1893
1894 lconn.send(None)
1895
1896 rconn.close()
1897 lconn.close()
1898
1899 lp.join()
1900 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001901"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001902#
1903#
1904#
1905
1906class _TestHeap(BaseTestCase):
1907
1908 ALLOWED_TYPES = ('processes',)
1909
1910 def test_heap(self):
1911 iterations = 5000
1912 maxblocks = 50
1913 blocks = []
1914
1915 # create and destroy lots of blocks of different sizes
1916 for i in range(iterations):
1917 size = int(random.lognormvariate(0, 1) * 1000)
1918 b = multiprocessing.heap.BufferWrapper(size)
1919 blocks.append(b)
1920 if len(blocks) > maxblocks:
1921 i = random.randrange(maxblocks)
1922 del blocks[i]
1923
1924 # get the heap object
1925 heap = multiprocessing.heap.BufferWrapper._heap
1926
1927 # verify the state of the heap
1928 all = []
1929 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001930 heap._lock.acquire()
1931 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001932 for L in list(heap._len_to_seq.values()):
1933 for arena, start, stop in L:
1934 all.append((heap._arenas.index(arena), start, stop,
1935 stop-start, 'free'))
1936 for arena, start, stop in heap._allocated_blocks:
1937 all.append((heap._arenas.index(arena), start, stop,
1938 stop-start, 'occupied'))
1939 occupied += (stop-start)
1940
1941 all.sort()
1942
1943 for i in range(len(all)-1):
1944 (arena, start, stop) = all[i][:3]
1945 (narena, nstart, nstop) = all[i+1][:3]
1946 self.assertTrue((arena != narena and nstart == 0) or
1947 (stop == nstart))
1948
Charles-François Natali778db492011-07-02 14:35:49 +02001949 def test_free_from_gc(self):
1950 # Check that freeing of blocks by the garbage collector doesn't deadlock
1951 # (issue #12352).
1952 # Make sure the GC is enabled, and set lower collection thresholds to
1953 # make collections more frequent (and increase the probability of
1954 # deadlock).
1955 if not gc.isenabled():
1956 gc.enable()
1957 self.addCleanup(gc.disable)
1958 thresholds = gc.get_threshold()
1959 self.addCleanup(gc.set_threshold, *thresholds)
1960 gc.set_threshold(10)
1961
1962 # perform numerous block allocations, with cyclic references to make
1963 # sure objects are collected asynchronously by the gc
1964 for i in range(5000):
1965 a = multiprocessing.heap.BufferWrapper(1)
1966 b = multiprocessing.heap.BufferWrapper(1)
1967 # circular references
1968 a.buddy = b
1969 b.buddy = a
1970
Benjamin Petersone711caf2008-06-11 16:44:04 +00001971#
1972#
1973#
1974
Benjamin Petersone711caf2008-06-11 16:44:04 +00001975class _Foo(Structure):
1976 _fields_ = [
1977 ('x', c_int),
1978 ('y', c_double)
1979 ]
1980
1981class _TestSharedCTypes(BaseTestCase):
1982
1983 ALLOWED_TYPES = ('processes',)
1984
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001985 def setUp(self):
1986 if not HAS_SHAREDCTYPES:
1987 self.skipTest("requires multiprocessing.sharedctypes")
1988
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001989 @classmethod
1990 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991 x.value *= 2
1992 y.value *= 2
1993 foo.x *= 2
1994 foo.y *= 2
1995 string.value *= 2
1996 for i in range(len(arr)):
1997 arr[i] *= 2
1998
1999 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002000 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002001 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002002 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002003 arr = self.Array('d', list(range(10)), lock=lock)
2004 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002005 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002006
2007 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002008 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002009 p.start()
2010 p.join()
2011
2012 self.assertEqual(x.value, 14)
2013 self.assertAlmostEqual(y.value, 2.0/3.0)
2014 self.assertEqual(foo.x, 6)
2015 self.assertAlmostEqual(foo.y, 4.0)
2016 for i in range(10):
2017 self.assertAlmostEqual(arr[i], i*2)
2018 self.assertEqual(string.value, latin('hellohello'))
2019
2020 def test_synchronize(self):
2021 self.test_sharedctypes(lock=True)
2022
2023 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002024 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002025 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002026 foo.x = 0
2027 foo.y = 0
2028 self.assertEqual(bar.x, 2)
2029 self.assertAlmostEqual(bar.y, 5.0)
2030
2031#
2032#
2033#
2034
2035class _TestFinalize(BaseTestCase):
2036
2037 ALLOWED_TYPES = ('processes',)
2038
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002039 @classmethod
2040 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002041 class Foo(object):
2042 pass
2043
2044 a = Foo()
2045 util.Finalize(a, conn.send, args=('a',))
2046 del a # triggers callback for a
2047
2048 b = Foo()
2049 close_b = util.Finalize(b, conn.send, args=('b',))
2050 close_b() # triggers callback for b
2051 close_b() # does nothing because callback has already been called
2052 del b # does nothing because callback has already been called
2053
2054 c = Foo()
2055 util.Finalize(c, conn.send, args=('c',))
2056
2057 d10 = Foo()
2058 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2059
2060 d01 = Foo()
2061 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2062 d02 = Foo()
2063 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2064 d03 = Foo()
2065 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2066
2067 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2068
2069 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2070
Ezio Melotti13925002011-03-16 11:05:33 +02002071 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002072 # garbage collecting locals
2073 util._exit_function()
2074 conn.close()
2075 os._exit(0)
2076
2077 def test_finalize(self):
2078 conn, child_conn = self.Pipe()
2079
2080 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002081 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002082 p.start()
2083 p.join()
2084
2085 result = [obj for obj in iter(conn.recv, 'STOP')]
2086 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2087
2088#
2089# Test that from ... import * works for each module
2090#
2091
2092class _TestImportStar(BaseTestCase):
2093
2094 ALLOWED_TYPES = ('processes',)
2095
2096 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002097 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002098 'multiprocessing', 'multiprocessing.connection',
2099 'multiprocessing.heap', 'multiprocessing.managers',
2100 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002101 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002102 ]
2103
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002104 if HAS_REDUCTION:
2105 modules.append('multiprocessing.reduction')
2106
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002107 if c_int is not None:
2108 # This module requires _ctypes
2109 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002110
2111 for name in modules:
2112 __import__(name)
2113 mod = sys.modules[name]
2114
2115 for attr in getattr(mod, '__all__', ()):
2116 self.assertTrue(
2117 hasattr(mod, attr),
2118 '%r does not have attribute %r' % (mod, attr)
2119 )
2120
2121#
2122# Quick test that logging works -- does not test logging output
2123#
2124
2125class _TestLogging(BaseTestCase):
2126
2127 ALLOWED_TYPES = ('processes',)
2128
2129 def test_enable_logging(self):
2130 logger = multiprocessing.get_logger()
2131 logger.setLevel(util.SUBWARNING)
2132 self.assertTrue(logger is not None)
2133 logger.debug('this will not be printed')
2134 logger.info('nor will this')
2135 logger.setLevel(LOG_LEVEL)
2136
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002137 @classmethod
2138 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002139 logger = multiprocessing.get_logger()
2140 conn.send(logger.getEffectiveLevel())
2141
2142 def test_level(self):
2143 LEVEL1 = 32
2144 LEVEL2 = 37
2145
2146 logger = multiprocessing.get_logger()
2147 root_logger = logging.getLogger()
2148 root_level = root_logger.level
2149
2150 reader, writer = multiprocessing.Pipe(duplex=False)
2151
2152 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002153 p = self.Process(target=self._test_level, args=(writer,))
2154 p.daemon = True
2155 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002156 self.assertEqual(LEVEL1, reader.recv())
2157
2158 logger.setLevel(logging.NOTSET)
2159 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002160 p = self.Process(target=self._test_level, args=(writer,))
2161 p.daemon = True
2162 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002163 self.assertEqual(LEVEL2, reader.recv())
2164
2165 root_logger.setLevel(root_level)
2166 logger.setLevel(level=LOG_LEVEL)
2167
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002168
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002169# class _TestLoggingProcessName(BaseTestCase):
2170#
2171# def handle(self, record):
2172# assert record.processName == multiprocessing.current_process().name
2173# self.__handled = True
2174#
2175# def test_logging(self):
2176# handler = logging.Handler()
2177# handler.handle = self.handle
2178# self.__handled = False
2179# # Bypass getLogger() and side-effects
2180# logger = logging.getLoggerClass()(
2181# 'multiprocessing.test.TestLoggingProcessName')
2182# logger.addHandler(handler)
2183# logger.propagate = False
2184#
2185# logger.warn('foo')
2186# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002187
Benjamin Petersone711caf2008-06-11 16:44:04 +00002188#
Jesse Noller6214edd2009-01-19 16:23:53 +00002189# Test to verify handle verification, see issue 3321
2190#
2191
2192class TestInvalidHandle(unittest.TestCase):
2193
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002194 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002195 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002196 conn = multiprocessing.connection.Connection(44977608)
2197 try:
2198 self.assertRaises((ValueError, IOError), conn.poll)
2199 finally:
2200 # Hack private attribute _handle to avoid printing an error
2201 # in conn.__del__
2202 conn._handle = None
2203 self.assertRaises((ValueError, IOError),
2204 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002205
Jesse Noller6214edd2009-01-19 16:23:53 +00002206#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002207# Functions used to create test cases from the base ones in this module
2208#
2209
2210def get_attributes(Source, names):
2211 d = {}
2212 for name in names:
2213 obj = getattr(Source, name)
2214 if type(obj) == type(get_attributes):
2215 obj = staticmethod(obj)
2216 d[name] = obj
2217 return d
2218
2219def create_test_cases(Mixin, type):
2220 result = {}
2221 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002222 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002223
2224 for name in list(glob.keys()):
2225 if name.startswith('_Test'):
2226 base = glob[name]
2227 if type in base.ALLOWED_TYPES:
2228 newname = 'With' + Type + name[1:]
2229 class Temp(base, unittest.TestCase, Mixin):
2230 pass
2231 result[newname] = Temp
2232 Temp.__name__ = newname
2233 Temp.__module__ = Mixin.__module__
2234 return result
2235
2236#
2237# Create test cases
2238#
2239
2240class ProcessesMixin(object):
2241 TYPE = 'processes'
2242 Process = multiprocessing.Process
2243 locals().update(get_attributes(multiprocessing, (
2244 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2245 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2246 'RawArray', 'current_process', 'active_children', 'Pipe',
2247 'connection', 'JoinableQueue'
2248 )))
2249
2250testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2251globals().update(testcases_processes)
2252
2253
2254class ManagerMixin(object):
2255 TYPE = 'manager'
2256 Process = multiprocessing.Process
2257 manager = object.__new__(multiprocessing.managers.SyncManager)
2258 locals().update(get_attributes(manager, (
2259 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2260 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2261 'Namespace', 'JoinableQueue'
2262 )))
2263
2264testcases_manager = create_test_cases(ManagerMixin, type='manager')
2265globals().update(testcases_manager)
2266
2267
2268class ThreadsMixin(object):
2269 TYPE = 'threads'
2270 Process = multiprocessing.dummy.Process
2271 locals().update(get_attributes(multiprocessing.dummy, (
2272 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2273 'Condition', 'Event', 'Value', 'Array', 'current_process',
2274 'active_children', 'Pipe', 'connection', 'dict', 'list',
2275 'Namespace', 'JoinableQueue'
2276 )))
2277
2278testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2279globals().update(testcases_threads)
2280
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002281class OtherTest(unittest.TestCase):
2282 # TODO: add more tests for deliver/answer challenge.
2283 def test_deliver_challenge_auth_failure(self):
2284 class _FakeConnection(object):
2285 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002286 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002287 def send_bytes(self, data):
2288 pass
2289 self.assertRaises(multiprocessing.AuthenticationError,
2290 multiprocessing.connection.deliver_challenge,
2291 _FakeConnection(), b'abc')
2292
2293 def test_answer_challenge_auth_failure(self):
2294 class _FakeConnection(object):
2295 def __init__(self):
2296 self.count = 0
2297 def recv_bytes(self, size):
2298 self.count += 1
2299 if self.count == 1:
2300 return multiprocessing.connection.CHALLENGE
2301 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002302 return b'something bogus'
2303 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002304 def send_bytes(self, data):
2305 pass
2306 self.assertRaises(multiprocessing.AuthenticationError,
2307 multiprocessing.connection.answer_challenge,
2308 _FakeConnection(), b'abc')
2309
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002310#
2311# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2312#
2313
2314def initializer(ns):
2315 ns.test += 1
2316
2317class TestInitializers(unittest.TestCase):
2318 def setUp(self):
2319 self.mgr = multiprocessing.Manager()
2320 self.ns = self.mgr.Namespace()
2321 self.ns.test = 0
2322
2323 def tearDown(self):
2324 self.mgr.shutdown()
2325
2326 def test_manager_initializer(self):
2327 m = multiprocessing.managers.SyncManager()
2328 self.assertRaises(TypeError, m.start, 1)
2329 m.start(initializer, (self.ns,))
2330 self.assertEqual(self.ns.test, 1)
2331 m.shutdown()
2332
2333 def test_pool_initializer(self):
2334 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2335 p = multiprocessing.Pool(1, initializer, (self.ns,))
2336 p.close()
2337 p.join()
2338 self.assertEqual(self.ns.test, 1)
2339
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002340#
2341# Issue 5155, 5313, 5331: Test process in processes
2342# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2343#
2344
2345def _ThisSubProcess(q):
2346 try:
2347 item = q.get(block=False)
2348 except pyqueue.Empty:
2349 pass
2350
2351def _TestProcess(q):
2352 queue = multiprocessing.Queue()
2353 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002354 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002355 subProc.start()
2356 subProc.join()
2357
2358def _afunc(x):
2359 return x*x
2360
2361def pool_in_process():
2362 pool = multiprocessing.Pool(processes=4)
2363 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2364
2365class _file_like(object):
2366 def __init__(self, delegate):
2367 self._delegate = delegate
2368 self._pid = None
2369
2370 @property
2371 def cache(self):
2372 pid = os.getpid()
2373 # There are no race conditions since fork keeps only the running thread
2374 if pid != self._pid:
2375 self._pid = pid
2376 self._cache = []
2377 return self._cache
2378
2379 def write(self, data):
2380 self.cache.append(data)
2381
2382 def flush(self):
2383 self._delegate.write(''.join(self.cache))
2384 self._cache = []
2385
2386class TestStdinBadfiledescriptor(unittest.TestCase):
2387
2388 def test_queue_in_process(self):
2389 queue = multiprocessing.Queue()
2390 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2391 proc.start()
2392 proc.join()
2393
2394 def test_pool_in_process(self):
2395 p = multiprocessing.Process(target=pool_in_process)
2396 p.start()
2397 p.join()
2398
2399 def test_flushing(self):
2400 sio = io.StringIO()
2401 flike = _file_like(sio)
2402 flike.write('foo')
2403 proc = multiprocessing.Process(target=lambda: flike.flush())
2404 flike.flush()
2405 assert sio.getvalue() == 'foo'
2406
2407testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2408 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002409
Benjamin Petersone711caf2008-06-11 16:44:04 +00002410#
2411#
2412#
2413
2414def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002415 if sys.platform.startswith("linux"):
2416 try:
2417 lock = multiprocessing.RLock()
2418 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002419 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002420
Charles-François Natali221ef672011-11-22 18:55:22 +01002421 check_enough_semaphores()
2422
Benjamin Petersone711caf2008-06-11 16:44:04 +00002423 if run is None:
2424 from test.support import run_unittest as run
2425
2426 util.get_temp_dir() # creates temp directory for use by all processes
2427
2428 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2429
Benjamin Peterson41181742008-07-02 20:22:54 +00002430 ProcessesMixin.pool = multiprocessing.Pool(4)
2431 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2432 ManagerMixin.manager.__init__()
2433 ManagerMixin.manager.start()
2434 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002435
2436 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002437 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2438 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002439 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2440 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002441 )
2442
2443 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2444 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2445 run(suite)
2446
Benjamin Peterson41181742008-07-02 20:22:54 +00002447 ThreadsMixin.pool.terminate()
2448 ProcessesMixin.pool.terminate()
2449 ManagerMixin.pool.terminate()
2450 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002451
Benjamin Peterson41181742008-07-02 20:22:54 +00002452 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002453
2454def main():
2455 test_main(unittest.TextTestRunner(verbosity=2).run)
2456
2457if __name__ == '__main__':
2458 main()