blob: e5beb97a0bcf757efa7a428d152a13878983ad24 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086if WIN32:
87 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
88
89 def wait_for_handle(handle, timeout):
90 if timeout is None or timeout < 0.0:
91 timeout = INFINITE
92 else:
93 timeout = int(1000 * timeout)
94 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
95else:
96 from select import select
97 _select = util._eintr_retry(select)
98
99 def wait_for_handle(handle, timeout):
100 if timeout is not None and timeout < 0.0:
101 timeout = None
102 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000103
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200104try:
105 MAXFD = os.sysconf("SC_OPEN_MAX")
106except:
107 MAXFD = 256
108
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000110# Some tests require ctypes
111#
112
113try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000114 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000115except ImportError:
116 Structure = object
117 c_int = c_double = None
118
Charles-François Natali221ef672011-11-22 18:55:22 +0100119
120def check_enough_semaphores():
121 """Check that the system supports enough semaphores to run the test."""
122 # minimum number of semaphores available according to POSIX
123 nsems_min = 256
124 try:
125 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
126 except (AttributeError, ValueError):
127 # sysconf not available or setting not available
128 return
129 if nsems == -1 or nsems >= nsems_min:
130 return
131 raise unittest.SkipTest("The OS doesn't support enough semaphores "
132 "to run the test (required: %d)." % nsems_min)
133
134
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000135#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136# Creates a wrapper for a function which records the time it takes to finish
137#
138
139class TimingWrapper(object):
140
141 def __init__(self, func):
142 self.func = func
143 self.elapsed = None
144
145 def __call__(self, *args, **kwds):
146 t = time.time()
147 try:
148 return self.func(*args, **kwds)
149 finally:
150 self.elapsed = time.time() - t
151
152#
153# Base class for test cases
154#
155
156class BaseTestCase(object):
157
158 ALLOWED_TYPES = ('processes', 'manager', 'threads')
159
160 def assertTimingAlmostEqual(self, a, b):
161 if CHECK_TIMINGS:
162 self.assertAlmostEqual(a, b, 1)
163
164 def assertReturnsIfImplemented(self, value, func, *args):
165 try:
166 res = func(*args)
167 except NotImplementedError:
168 pass
169 else:
170 return self.assertEqual(value, res)
171
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 # For the sanity of Windows users, rather than crashing or freezing in
173 # multiple ways.
174 def __reduce__(self, *args):
175 raise NotImplementedError("shouldn't try to pickle a test case")
176
177 __reduce_ex__ = __reduce__
178
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179#
180# Return the value of a semaphore
181#
182
183def get_value(self):
184 try:
185 return self.get_value()
186 except AttributeError:
187 try:
188 return self._Semaphore__value
189 except AttributeError:
190 try:
191 return self._value
192 except AttributeError:
193 raise NotImplementedError
194
195#
196# Testcases
197#
198
199class _TestProcess(BaseTestCase):
200
201 ALLOWED_TYPES = ('processes', 'threads')
202
203 def test_current(self):
204 if self.TYPE == 'threads':
205 return
206
207 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000212 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 self.assertEqual(current.ident, os.getpid())
215 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 def test_daemon_argument(self):
218 if self.TYPE == "threads":
219 return
220
221 # By default uses the current process's daemon flag.
222 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000223 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000224 proc1 = self.Process(target=self._test, daemon=True)
225 self.assertTrue(proc1.daemon)
226 proc2 = self.Process(target=self._test, daemon=False)
227 self.assertFalse(proc2.daemon)
228
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 @classmethod
230 def _test(cls, q, *args, **kwds):
231 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 q.put(args)
233 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000235 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000236 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 q.put(current.pid)
238
239 def test_process(self):
240 q = self.Queue(1)
241 e = self.Event()
242 args = (q, 1, 2)
243 kwargs = {'hello':23, 'bye':2.54}
244 name = 'SomeProcess'
245 p = self.Process(
246 target=self._test, args=args, kwargs=kwargs, name=name
247 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 current = self.current_process()
250
251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.authkey, current.authkey)
253 self.assertEqual(p.is_alive(), False)
254 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
260
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(p.exitcode, None)
262 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(q.get(), args[1:])
266 self.assertEqual(q.get(), kwargs)
267 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(q.get(), current.authkey)
270 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
272 p.join()
273
Ezio Melottib3aedd42010-11-20 19:04:17 +0000274 self.assertEqual(p.exitcode, 0)
275 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000276 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000278 @classmethod
279 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 time.sleep(1000)
281
282 def test_terminate(self):
283 if self.TYPE == 'threads':
284 return
285
286 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000287 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288 p.start()
289
290 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000291 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000292 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293
294 p.terminate()
295
296 join = TimingWrapper(p.join)
297 self.assertEqual(join(), None)
298 self.assertTimingAlmostEqual(join.elapsed, 0.0)
299
300 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
304
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000305 # XXX sometimes get p.exitcode == 0 on Windows ...
306 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307
308 def test_cpu_count(self):
309 try:
310 cpus = multiprocessing.cpu_count()
311 except NotImplementedError:
312 cpus = 1
313 self.assertTrue(type(cpus) is int)
314 self.assertTrue(cpus >= 1)
315
316 def test_active_children(self):
317 self.assertEqual(type(self.active_children()), list)
318
319 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000320 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
Jesus Cea94f964f2011-09-09 20:26:57 +0200322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000324 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000327 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000329 @classmethod
330 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 from multiprocessing import forking
332 wconn.send(id)
333 if len(id) < 2:
334 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000335 p = cls.Process(
336 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 )
338 p.start()
339 p.join()
340
341 def test_recursion(self):
342 rconn, wconn = self.Pipe(duplex=False)
343 self._test_recursion(wconn, [])
344
345 time.sleep(DELTA)
346 result = []
347 while rconn.poll():
348 result.append(rconn.recv())
349
350 expected = [
351 [],
352 [0],
353 [0, 0],
354 [0, 1],
355 [1],
356 [1, 0],
357 [1, 1]
358 ]
359 self.assertEqual(result, expected)
360
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200361 @classmethod
362 def _test_sentinel(cls, event):
363 event.wait(10.0)
364
365 def test_sentinel(self):
366 if self.TYPE == "threads":
367 return
368 event = self.Event()
369 p = self.Process(target=self._test_sentinel, args=(event,))
370 with self.assertRaises(ValueError):
371 p.sentinel
372 p.start()
373 self.addCleanup(p.join)
374 sentinel = p.sentinel
375 self.assertIsInstance(sentinel, int)
376 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
377 event.set()
378 p.join()
379 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
380
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381#
382#
383#
384
385class _UpperCaser(multiprocessing.Process):
386
387 def __init__(self):
388 multiprocessing.Process.__init__(self)
389 self.child_conn, self.parent_conn = multiprocessing.Pipe()
390
391 def run(self):
392 self.parent_conn.close()
393 for s in iter(self.child_conn.recv, None):
394 self.child_conn.send(s.upper())
395 self.child_conn.close()
396
397 def submit(self, s):
398 assert type(s) is str
399 self.parent_conn.send(s)
400 return self.parent_conn.recv()
401
402 def stop(self):
403 self.parent_conn.send(None)
404 self.parent_conn.close()
405 self.child_conn.close()
406
407class _TestSubclassingProcess(BaseTestCase):
408
409 ALLOWED_TYPES = ('processes',)
410
411 def test_subclassing(self):
412 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200413 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 uppercaser.start()
415 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
416 self.assertEqual(uppercaser.submit('world'), 'WORLD')
417 uppercaser.stop()
418 uppercaser.join()
419
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100420 def test_stderr_flush(self):
421 # sys.stderr is flushed at process shutdown (issue #13812)
422 if self.TYPE == "threads":
423 return
424
425 testfn = test.support.TESTFN
426 self.addCleanup(test.support.unlink, testfn)
427 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
428 proc.start()
429 proc.join()
430 with open(testfn, 'r') as f:
431 err = f.read()
432 # The whole traceback was printed
433 self.assertIn("ZeroDivisionError", err)
434 self.assertIn("test_multiprocessing.py", err)
435 self.assertIn("1/0 # MARKER", err)
436
437 @classmethod
438 def _test_stderr_flush(cls, testfn):
439 sys.stderr = open(testfn, 'w')
440 1/0 # MARKER
441
442
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443#
444#
445#
446
447def queue_empty(q):
448 if hasattr(q, 'empty'):
449 return q.empty()
450 else:
451 return q.qsize() == 0
452
453def queue_full(q, maxsize):
454 if hasattr(q, 'full'):
455 return q.full()
456 else:
457 return q.qsize() == maxsize
458
459
460class _TestQueue(BaseTestCase):
461
462
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000463 @classmethod
464 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000465 child_can_start.wait()
466 for i in range(6):
467 queue.get()
468 parent_can_continue.set()
469
470 def test_put(self):
471 MAXSIZE = 6
472 queue = self.Queue(maxsize=MAXSIZE)
473 child_can_start = self.Event()
474 parent_can_continue = self.Event()
475
476 proc = self.Process(
477 target=self._test_put,
478 args=(queue, child_can_start, parent_can_continue)
479 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000480 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000481 proc.start()
482
483 self.assertEqual(queue_empty(queue), True)
484 self.assertEqual(queue_full(queue, MAXSIZE), False)
485
486 queue.put(1)
487 queue.put(2, True)
488 queue.put(3, True, None)
489 queue.put(4, False)
490 queue.put(5, False, None)
491 queue.put_nowait(6)
492
493 # the values may be in buffer but not yet in pipe so sleep a bit
494 time.sleep(DELTA)
495
496 self.assertEqual(queue_empty(queue), False)
497 self.assertEqual(queue_full(queue, MAXSIZE), True)
498
499 put = TimingWrapper(queue.put)
500 put_nowait = TimingWrapper(queue.put_nowait)
501
502 self.assertRaises(pyqueue.Full, put, 7, False)
503 self.assertTimingAlmostEqual(put.elapsed, 0)
504
505 self.assertRaises(pyqueue.Full, put, 7, False, None)
506 self.assertTimingAlmostEqual(put.elapsed, 0)
507
508 self.assertRaises(pyqueue.Full, put_nowait, 7)
509 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
510
511 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
512 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
513
514 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
515 self.assertTimingAlmostEqual(put.elapsed, 0)
516
517 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
518 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
519
520 child_can_start.set()
521 parent_can_continue.wait()
522
523 self.assertEqual(queue_empty(queue), True)
524 self.assertEqual(queue_full(queue, MAXSIZE), False)
525
526 proc.join()
527
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000528 @classmethod
529 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000531 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 queue.put(2)
533 queue.put(3)
534 queue.put(4)
535 queue.put(5)
536 parent_can_continue.set()
537
538 def test_get(self):
539 queue = self.Queue()
540 child_can_start = self.Event()
541 parent_can_continue = self.Event()
542
543 proc = self.Process(
544 target=self._test_get,
545 args=(queue, child_can_start, parent_can_continue)
546 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000547 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000548 proc.start()
549
550 self.assertEqual(queue_empty(queue), True)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 time.sleep(DELTA)
556 self.assertEqual(queue_empty(queue), False)
557
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000558 # Hangs unexpectedly, remove for now
559 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 self.assertEqual(queue.get(True, None), 2)
561 self.assertEqual(queue.get(True), 3)
562 self.assertEqual(queue.get(timeout=1), 4)
563 self.assertEqual(queue.get_nowait(), 5)
564
565 self.assertEqual(queue_empty(queue), True)
566
567 get = TimingWrapper(queue.get)
568 get_nowait = TimingWrapper(queue.get_nowait)
569
570 self.assertRaises(pyqueue.Empty, get, False)
571 self.assertTimingAlmostEqual(get.elapsed, 0)
572
573 self.assertRaises(pyqueue.Empty, get, False, None)
574 self.assertTimingAlmostEqual(get.elapsed, 0)
575
576 self.assertRaises(pyqueue.Empty, get_nowait)
577 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
578
579 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
580 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
581
582 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
583 self.assertTimingAlmostEqual(get.elapsed, 0)
584
585 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
586 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
587
588 proc.join()
589
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000590 @classmethod
591 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 for i in range(10, 20):
593 queue.put(i)
594 # note that at this point the items may only be buffered, so the
595 # process cannot shutdown until the feeder thread has finished
596 # pushing items onto the pipe.
597
598 def test_fork(self):
599 # Old versions of Queue would fail to create a new feeder
600 # thread for a forked process if the original process had its
601 # own feeder thread. This test checks that this no longer
602 # happens.
603
604 queue = self.Queue()
605
606 # put items on queue so that main process starts a feeder thread
607 for i in range(10):
608 queue.put(i)
609
610 # wait to make sure thread starts before we fork a new process
611 time.sleep(DELTA)
612
613 # fork process
614 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200615 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000616 p.start()
617
618 # check that all expected items are in the queue
619 for i in range(20):
620 self.assertEqual(queue.get(), i)
621 self.assertRaises(pyqueue.Empty, queue.get, False)
622
623 p.join()
624
625 def test_qsize(self):
626 q = self.Queue()
627 try:
628 self.assertEqual(q.qsize(), 0)
629 except NotImplementedError:
630 return
631 q.put(1)
632 self.assertEqual(q.qsize(), 1)
633 q.put(5)
634 self.assertEqual(q.qsize(), 2)
635 q.get()
636 self.assertEqual(q.qsize(), 1)
637 q.get()
638 self.assertEqual(q.qsize(), 0)
639
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000640 @classmethod
641 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000642 for obj in iter(q.get, None):
643 time.sleep(DELTA)
644 q.task_done()
645
646 def test_task_done(self):
647 queue = self.JoinableQueue()
648
649 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000650 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651
652 workers = [self.Process(target=self._test_task_done, args=(queue,))
653 for i in range(4)]
654
655 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200656 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657 p.start()
658
659 for i in range(10):
660 queue.put(i)
661
662 queue.join()
663
664 for p in workers:
665 queue.put(None)
666
667 for p in workers:
668 p.join()
669
670#
671#
672#
673
674class _TestLock(BaseTestCase):
675
676 def test_lock(self):
677 lock = self.Lock()
678 self.assertEqual(lock.acquire(), True)
679 self.assertEqual(lock.acquire(False), False)
680 self.assertEqual(lock.release(), None)
681 self.assertRaises((ValueError, threading.ThreadError), lock.release)
682
683 def test_rlock(self):
684 lock = self.RLock()
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(), True)
687 self.assertEqual(lock.acquire(), True)
688 self.assertEqual(lock.release(), None)
689 self.assertEqual(lock.release(), None)
690 self.assertEqual(lock.release(), None)
691 self.assertRaises((AssertionError, RuntimeError), lock.release)
692
Jesse Nollerf8d00852009-03-31 03:25:07 +0000693 def test_lock_context(self):
694 with self.Lock():
695 pass
696
Benjamin Petersone711caf2008-06-11 16:44:04 +0000697
698class _TestSemaphore(BaseTestCase):
699
700 def _test_semaphore(self, sem):
701 self.assertReturnsIfImplemented(2, get_value, sem)
702 self.assertEqual(sem.acquire(), True)
703 self.assertReturnsIfImplemented(1, get_value, sem)
704 self.assertEqual(sem.acquire(), True)
705 self.assertReturnsIfImplemented(0, get_value, sem)
706 self.assertEqual(sem.acquire(False), False)
707 self.assertReturnsIfImplemented(0, get_value, sem)
708 self.assertEqual(sem.release(), None)
709 self.assertReturnsIfImplemented(1, get_value, sem)
710 self.assertEqual(sem.release(), None)
711 self.assertReturnsIfImplemented(2, get_value, sem)
712
713 def test_semaphore(self):
714 sem = self.Semaphore(2)
715 self._test_semaphore(sem)
716 self.assertEqual(sem.release(), None)
717 self.assertReturnsIfImplemented(3, get_value, sem)
718 self.assertEqual(sem.release(), None)
719 self.assertReturnsIfImplemented(4, get_value, sem)
720
721 def test_bounded_semaphore(self):
722 sem = self.BoundedSemaphore(2)
723 self._test_semaphore(sem)
724 # Currently fails on OS/X
725 #if HAVE_GETVALUE:
726 # self.assertRaises(ValueError, sem.release)
727 # self.assertReturnsIfImplemented(2, get_value, sem)
728
729 def test_timeout(self):
730 if self.TYPE != 'processes':
731 return
732
733 sem = self.Semaphore(0)
734 acquire = TimingWrapper(sem.acquire)
735
736 self.assertEqual(acquire(False), False)
737 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
738
739 self.assertEqual(acquire(False, None), False)
740 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
741
742 self.assertEqual(acquire(False, TIMEOUT1), False)
743 self.assertTimingAlmostEqual(acquire.elapsed, 0)
744
745 self.assertEqual(acquire(True, TIMEOUT2), False)
746 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
747
748 self.assertEqual(acquire(timeout=TIMEOUT3), False)
749 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
750
751
752class _TestCondition(BaseTestCase):
753
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000754 @classmethod
755 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 cond.acquire()
757 sleeping.release()
758 cond.wait(timeout)
759 woken.release()
760 cond.release()
761
762 def check_invariant(self, cond):
763 # this is only supposed to succeed when there are no sleepers
764 if self.TYPE == 'processes':
765 try:
766 sleepers = (cond._sleeping_count.get_value() -
767 cond._woken_count.get_value())
768 self.assertEqual(sleepers, 0)
769 self.assertEqual(cond._wait_semaphore.get_value(), 0)
770 except NotImplementedError:
771 pass
772
773 def test_notify(self):
774 cond = self.Condition()
775 sleeping = self.Semaphore(0)
776 woken = self.Semaphore(0)
777
778 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000779 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 p.start()
781
782 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000783 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 p.start()
785
786 # wait for both children to start sleeping
787 sleeping.acquire()
788 sleeping.acquire()
789
790 # check no process/thread has woken up
791 time.sleep(DELTA)
792 self.assertReturnsIfImplemented(0, get_value, woken)
793
794 # wake up one process/thread
795 cond.acquire()
796 cond.notify()
797 cond.release()
798
799 # check one process/thread has woken up
800 time.sleep(DELTA)
801 self.assertReturnsIfImplemented(1, get_value, woken)
802
803 # wake up another
804 cond.acquire()
805 cond.notify()
806 cond.release()
807
808 # check other has woken up
809 time.sleep(DELTA)
810 self.assertReturnsIfImplemented(2, get_value, woken)
811
812 # check state is not mucked up
813 self.check_invariant(cond)
814 p.join()
815
816 def test_notify_all(self):
817 cond = self.Condition()
818 sleeping = self.Semaphore(0)
819 woken = self.Semaphore(0)
820
821 # start some threads/processes which will timeout
822 for i in range(3):
823 p = self.Process(target=self.f,
824 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000825 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 p.start()
827
828 t = threading.Thread(target=self.f,
829 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000830 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 t.start()
832
833 # wait for them all to sleep
834 for i in range(6):
835 sleeping.acquire()
836
837 # check they have all timed out
838 for i in range(6):
839 woken.acquire()
840 self.assertReturnsIfImplemented(0, get_value, woken)
841
842 # check state is not mucked up
843 self.check_invariant(cond)
844
845 # start some more threads/processes
846 for i in range(3):
847 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000848 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 p.start()
850
851 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000852 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 t.start()
854
855 # wait for them to all sleep
856 for i in range(6):
857 sleeping.acquire()
858
859 # check no process/thread has woken up
860 time.sleep(DELTA)
861 self.assertReturnsIfImplemented(0, get_value, woken)
862
863 # wake them all up
864 cond.acquire()
865 cond.notify_all()
866 cond.release()
867
868 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200869 for i in range(10):
870 try:
871 if get_value(woken) == 6:
872 break
873 except NotImplementedError:
874 break
875 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 self.assertReturnsIfImplemented(6, get_value, woken)
877
878 # check state is not mucked up
879 self.check_invariant(cond)
880
881 def test_timeout(self):
882 cond = self.Condition()
883 wait = TimingWrapper(cond.wait)
884 cond.acquire()
885 res = wait(TIMEOUT1)
886 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000887 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
889
890
891class _TestEvent(BaseTestCase):
892
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000893 @classmethod
894 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 time.sleep(TIMEOUT2)
896 event.set()
897
898 def test_event(self):
899 event = self.Event()
900 wait = TimingWrapper(event.wait)
901
Ezio Melotti13925002011-03-16 11:05:33 +0200902 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000904 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905
Benjamin Peterson965ce872009-04-05 21:24:58 +0000906 # Removed, threading.Event.wait() will return the value of the __flag
907 # instead of None. API Shear with the semaphore backed mp.Event
908 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000909 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000910 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000911 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
912
913 event.set()
914
915 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000916 self.assertEqual(event.is_set(), True)
917 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000919 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
921 # self.assertEqual(event.is_set(), True)
922
923 event.clear()
924
925 #self.assertEqual(event.is_set(), False)
926
Jesus Cea94f964f2011-09-09 20:26:57 +0200927 p = self.Process(target=self._test_event, args=(event,))
928 p.daemon = True
929 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000930 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931
932#
933#
934#
935
936class _TestValue(BaseTestCase):
937
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000938 ALLOWED_TYPES = ('processes',)
939
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 codes_values = [
941 ('i', 4343, 24234),
942 ('d', 3.625, -4.25),
943 ('h', -232, 234),
944 ('c', latin('x'), latin('y'))
945 ]
946
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000947 def setUp(self):
948 if not HAS_SHAREDCTYPES:
949 self.skipTest("requires multiprocessing.sharedctypes")
950
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000951 @classmethod
952 def _test(cls, values):
953 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954 sv.value = cv[2]
955
956
957 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000958 if raw:
959 values = [self.RawValue(code, value)
960 for code, value, _ in self.codes_values]
961 else:
962 values = [self.Value(code, value)
963 for code, value, _ in self.codes_values]
964
965 for sv, cv in zip(values, self.codes_values):
966 self.assertEqual(sv.value, cv[1])
967
968 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200969 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 proc.start()
971 proc.join()
972
973 for sv, cv in zip(values, self.codes_values):
974 self.assertEqual(sv.value, cv[2])
975
976 def test_rawvalue(self):
977 self.test_value(raw=True)
978
979 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980 val1 = self.Value('i', 5)
981 lock1 = val1.get_lock()
982 obj1 = val1.get_obj()
983
984 val2 = self.Value('i', 5, lock=None)
985 lock2 = val2.get_lock()
986 obj2 = val2.get_obj()
987
988 lock = self.Lock()
989 val3 = self.Value('i', 5, lock=lock)
990 lock3 = val3.get_lock()
991 obj3 = val3.get_obj()
992 self.assertEqual(lock, lock3)
993
Jesse Nollerb0516a62009-01-18 03:11:38 +0000994 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 self.assertFalse(hasattr(arr4, 'get_lock'))
996 self.assertFalse(hasattr(arr4, 'get_obj'))
997
Jesse Nollerb0516a62009-01-18 03:11:38 +0000998 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
999
1000 arr5 = self.RawValue('i', 5)
1001 self.assertFalse(hasattr(arr5, 'get_lock'))
1002 self.assertFalse(hasattr(arr5, 'get_obj'))
1003
Benjamin Petersone711caf2008-06-11 16:44:04 +00001004
1005class _TestArray(BaseTestCase):
1006
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001007 ALLOWED_TYPES = ('processes',)
1008
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001009 @classmethod
1010 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011 for i in range(1, len(seq)):
1012 seq[i] += seq[i-1]
1013
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001016 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1017 if raw:
1018 arr = self.RawArray('i', seq)
1019 else:
1020 arr = self.Array('i', seq)
1021
1022 self.assertEqual(len(arr), len(seq))
1023 self.assertEqual(arr[3], seq[3])
1024 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1025
1026 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1027
1028 self.assertEqual(list(arr[:]), seq)
1029
1030 self.f(seq)
1031
1032 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001033 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001034 p.start()
1035 p.join()
1036
1037 self.assertEqual(list(arr[:]), seq)
1038
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001039 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001040 def test_array_from_size(self):
1041 size = 10
1042 # Test for zeroing (see issue #11675).
1043 # The repetition below strengthens the test by increasing the chances
1044 # of previously allocated non-zero memory being used for the new array
1045 # on the 2nd and 3rd loops.
1046 for _ in range(3):
1047 arr = self.Array('i', size)
1048 self.assertEqual(len(arr), size)
1049 self.assertEqual(list(arr), [0] * size)
1050 arr[:] = range(10)
1051 self.assertEqual(list(arr), list(range(10)))
1052 del arr
1053
1054 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001055 def test_rawarray(self):
1056 self.test_array(raw=True)
1057
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001058 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060 arr1 = self.Array('i', list(range(10)))
1061 lock1 = arr1.get_lock()
1062 obj1 = arr1.get_obj()
1063
1064 arr2 = self.Array('i', list(range(10)), lock=None)
1065 lock2 = arr2.get_lock()
1066 obj2 = arr2.get_obj()
1067
1068 lock = self.Lock()
1069 arr3 = self.Array('i', list(range(10)), lock=lock)
1070 lock3 = arr3.get_lock()
1071 obj3 = arr3.get_obj()
1072 self.assertEqual(lock, lock3)
1073
Jesse Nollerb0516a62009-01-18 03:11:38 +00001074 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001075 self.assertFalse(hasattr(arr4, 'get_lock'))
1076 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001077 self.assertRaises(AttributeError,
1078 self.Array, 'i', range(10), lock='notalock')
1079
1080 arr5 = self.RawArray('i', range(10))
1081 self.assertFalse(hasattr(arr5, 'get_lock'))
1082 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083
1084#
1085#
1086#
1087
1088class _TestContainers(BaseTestCase):
1089
1090 ALLOWED_TYPES = ('manager',)
1091
1092 def test_list(self):
1093 a = self.list(list(range(10)))
1094 self.assertEqual(a[:], list(range(10)))
1095
1096 b = self.list()
1097 self.assertEqual(b[:], [])
1098
1099 b.extend(list(range(5)))
1100 self.assertEqual(b[:], list(range(5)))
1101
1102 self.assertEqual(b[2], 2)
1103 self.assertEqual(b[2:10], [2,3,4])
1104
1105 b *= 2
1106 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1107
1108 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1109
1110 self.assertEqual(a[:], list(range(10)))
1111
1112 d = [a, b]
1113 e = self.list(d)
1114 self.assertEqual(
1115 e[:],
1116 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1117 )
1118
1119 f = self.list([a])
1120 a.append('hello')
1121 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1122
1123 def test_dict(self):
1124 d = self.dict()
1125 indices = list(range(65, 70))
1126 for i in indices:
1127 d[i] = chr(i)
1128 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1129 self.assertEqual(sorted(d.keys()), indices)
1130 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1131 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1132
1133 def test_namespace(self):
1134 n = self.Namespace()
1135 n.name = 'Bob'
1136 n.job = 'Builder'
1137 n._hidden = 'hidden'
1138 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1139 del n.job
1140 self.assertEqual(str(n), "Namespace(name='Bob')")
1141 self.assertTrue(hasattr(n, 'name'))
1142 self.assertTrue(not hasattr(n, 'job'))
1143
1144#
1145#
1146#
1147
1148def sqr(x, wait=0.0):
1149 time.sleep(wait)
1150 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001151
Antoine Pitroude911b22011-12-21 11:03:24 +01001152def mul(x, y):
1153 return x*y
1154
Benjamin Petersone711caf2008-06-11 16:44:04 +00001155class _TestPool(BaseTestCase):
1156
1157 def test_apply(self):
1158 papply = self.pool.apply
1159 self.assertEqual(papply(sqr, (5,)), sqr(5))
1160 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1161
1162 def test_map(self):
1163 pmap = self.pool.map
1164 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1165 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1166 list(map(sqr, list(range(100)))))
1167
Antoine Pitroude911b22011-12-21 11:03:24 +01001168 def test_starmap(self):
1169 psmap = self.pool.starmap
1170 tuples = list(zip(range(10), range(9,-1, -1)))
1171 self.assertEqual(psmap(mul, tuples),
1172 list(itertools.starmap(mul, tuples)))
1173 tuples = list(zip(range(100), range(99,-1, -1)))
1174 self.assertEqual(psmap(mul, tuples, chunksize=20),
1175 list(itertools.starmap(mul, tuples)))
1176
1177 def test_starmap_async(self):
1178 tuples = list(zip(range(100), range(99,-1, -1)))
1179 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1180 list(itertools.starmap(mul, tuples)))
1181
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001182 def test_map_chunksize(self):
1183 try:
1184 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1185 except multiprocessing.TimeoutError:
1186 self.fail("pool.map_async with chunksize stalled on null list")
1187
Benjamin Petersone711caf2008-06-11 16:44:04 +00001188 def test_async(self):
1189 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1190 get = TimingWrapper(res.get)
1191 self.assertEqual(get(), 49)
1192 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1193
1194 def test_async_timeout(self):
1195 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1196 get = TimingWrapper(res.get)
1197 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1198 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1199
1200 def test_imap(self):
1201 it = self.pool.imap(sqr, list(range(10)))
1202 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1203
1204 it = self.pool.imap(sqr, list(range(10)))
1205 for i in range(10):
1206 self.assertEqual(next(it), i*i)
1207 self.assertRaises(StopIteration, it.__next__)
1208
1209 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1210 for i in range(1000):
1211 self.assertEqual(next(it), i*i)
1212 self.assertRaises(StopIteration, it.__next__)
1213
1214 def test_imap_unordered(self):
1215 it = self.pool.imap_unordered(sqr, list(range(1000)))
1216 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1217
1218 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1219 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1220
1221 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001222 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1223 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1224
Benjamin Petersone711caf2008-06-11 16:44:04 +00001225 p = multiprocessing.Pool(3)
1226 self.assertEqual(3, len(p._pool))
1227 p.close()
1228 p.join()
1229
1230 def test_terminate(self):
1231 if self.TYPE == 'manager':
1232 # On Unix a forked process increfs each shared object to
1233 # which its parent process held a reference. If the
1234 # forked process gets terminated then there is likely to
1235 # be a reference leak. So to prevent
1236 # _TestZZZNumberOfObjects from failing we skip this test
1237 # when using a manager.
1238 return
1239
1240 result = self.pool.map_async(
1241 time.sleep, [0.1 for i in range(10000)], chunksize=1
1242 )
1243 self.pool.terminate()
1244 join = TimingWrapper(self.pool.join)
1245 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001246 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001247
Ask Solem2afcbf22010-11-09 20:55:52 +00001248def raising():
1249 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001250
Ask Solem2afcbf22010-11-09 20:55:52 +00001251def unpickleable_result():
1252 return lambda: 42
1253
1254class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001255 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001256
1257 def test_async_error_callback(self):
1258 p = multiprocessing.Pool(2)
1259
1260 scratchpad = [None]
1261 def errback(exc):
1262 scratchpad[0] = exc
1263
1264 res = p.apply_async(raising, error_callback=errback)
1265 self.assertRaises(KeyError, res.get)
1266 self.assertTrue(scratchpad[0])
1267 self.assertIsInstance(scratchpad[0], KeyError)
1268
1269 p.close()
1270 p.join()
1271
1272 def test_unpickleable_result(self):
1273 from multiprocessing.pool import MaybeEncodingError
1274 p = multiprocessing.Pool(2)
1275
1276 # Make sure we don't lose pool processes because of encoding errors.
1277 for iteration in range(20):
1278
1279 scratchpad = [None]
1280 def errback(exc):
1281 scratchpad[0] = exc
1282
1283 res = p.apply_async(unpickleable_result, error_callback=errback)
1284 self.assertRaises(MaybeEncodingError, res.get)
1285 wrapped = scratchpad[0]
1286 self.assertTrue(wrapped)
1287 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1288 self.assertIsNotNone(wrapped.exc)
1289 self.assertIsNotNone(wrapped.value)
1290
1291 p.close()
1292 p.join()
1293
1294class _TestPoolWorkerLifetime(BaseTestCase):
1295 ALLOWED_TYPES = ('processes', )
1296
Jesse Noller1f0b6582010-01-27 03:36:01 +00001297 def test_pool_worker_lifetime(self):
1298 p = multiprocessing.Pool(3, maxtasksperchild=10)
1299 self.assertEqual(3, len(p._pool))
1300 origworkerpids = [w.pid for w in p._pool]
1301 # Run many tasks so each worker gets replaced (hopefully)
1302 results = []
1303 for i in range(100):
1304 results.append(p.apply_async(sqr, (i, )))
1305 # Fetch the results and verify we got the right answers,
1306 # also ensuring all the tasks have completed.
1307 for (j, res) in enumerate(results):
1308 self.assertEqual(res.get(), sqr(j))
1309 # Refill the pool
1310 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001311 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001312 # (countdown * DELTA = 5 seconds max startup process time)
1313 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001314 while countdown and not all(w.is_alive() for w in p._pool):
1315 countdown -= 1
1316 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001317 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001318 # All pids should be assigned. See issue #7805.
1319 self.assertNotIn(None, origworkerpids)
1320 self.assertNotIn(None, finalworkerpids)
1321 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001322 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1323 p.close()
1324 p.join()
1325
Charles-François Natalif8859e12011-10-24 18:45:29 +02001326 def test_pool_worker_lifetime_early_close(self):
1327 # Issue #10332: closing a pool whose workers have limited lifetimes
1328 # before all the tasks completed would make join() hang.
1329 p = multiprocessing.Pool(3, maxtasksperchild=1)
1330 results = []
1331 for i in range(6):
1332 results.append(p.apply_async(sqr, (i, 0.3)))
1333 p.close()
1334 p.join()
1335 # check the results
1336 for (j, res) in enumerate(results):
1337 self.assertEqual(res.get(), sqr(j))
1338
1339
Benjamin Petersone711caf2008-06-11 16:44:04 +00001340#
1341# Test that manager has expected number of shared objects left
1342#
1343
1344class _TestZZZNumberOfObjects(BaseTestCase):
1345 # Because test cases are sorted alphabetically, this one will get
1346 # run after all the other tests for the manager. It tests that
1347 # there have been no "reference leaks" for the manager's shared
1348 # objects. Note the comment in _TestPool.test_terminate().
1349 ALLOWED_TYPES = ('manager',)
1350
1351 def test_number_of_objects(self):
1352 EXPECTED_NUMBER = 1 # the pool object is still alive
1353 multiprocessing.active_children() # discard dead process objs
1354 gc.collect() # do garbage collection
1355 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001356 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001357 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001358 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001359 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001360
1361 self.assertEqual(refs, EXPECTED_NUMBER)
1362
1363#
1364# Test of creating a customized manager class
1365#
1366
1367from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1368
1369class FooBar(object):
1370 def f(self):
1371 return 'f()'
1372 def g(self):
1373 raise ValueError
1374 def _h(self):
1375 return '_h()'
1376
1377def baz():
1378 for i in range(10):
1379 yield i*i
1380
1381class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001382 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001383 def __iter__(self):
1384 return self
1385 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001386 return self._callmethod('__next__')
1387
1388class MyManager(BaseManager):
1389 pass
1390
1391MyManager.register('Foo', callable=FooBar)
1392MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1393MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1394
1395
1396class _TestMyManager(BaseTestCase):
1397
1398 ALLOWED_TYPES = ('manager',)
1399
1400 def test_mymanager(self):
1401 manager = MyManager()
1402 manager.start()
1403
1404 foo = manager.Foo()
1405 bar = manager.Bar()
1406 baz = manager.baz()
1407
1408 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1409 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1410
1411 self.assertEqual(foo_methods, ['f', 'g'])
1412 self.assertEqual(bar_methods, ['f', '_h'])
1413
1414 self.assertEqual(foo.f(), 'f()')
1415 self.assertRaises(ValueError, foo.g)
1416 self.assertEqual(foo._callmethod('f'), 'f()')
1417 self.assertRaises(RemoteError, foo._callmethod, '_h')
1418
1419 self.assertEqual(bar.f(), 'f()')
1420 self.assertEqual(bar._h(), '_h()')
1421 self.assertEqual(bar._callmethod('f'), 'f()')
1422 self.assertEqual(bar._callmethod('_h'), '_h()')
1423
1424 self.assertEqual(list(baz), [i*i for i in range(10)])
1425
1426 manager.shutdown()
1427
1428#
1429# Test of connecting to a remote server and using xmlrpclib for serialization
1430#
1431
1432_queue = pyqueue.Queue()
1433def get_queue():
1434 return _queue
1435
1436class QueueManager(BaseManager):
1437 '''manager class used by server process'''
1438QueueManager.register('get_queue', callable=get_queue)
1439
1440class QueueManager2(BaseManager):
1441 '''manager class which specifies the same interface as QueueManager'''
1442QueueManager2.register('get_queue')
1443
1444
1445SERIALIZER = 'xmlrpclib'
1446
1447class _TestRemoteManager(BaseTestCase):
1448
1449 ALLOWED_TYPES = ('manager',)
1450
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001451 @classmethod
1452 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 manager = QueueManager2(
1454 address=address, authkey=authkey, serializer=SERIALIZER
1455 )
1456 manager.connect()
1457 queue = manager.get_queue()
1458 queue.put(('hello world', None, True, 2.25))
1459
1460 def test_remote(self):
1461 authkey = os.urandom(32)
1462
1463 manager = QueueManager(
1464 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1465 )
1466 manager.start()
1467
1468 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001469 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001470 p.start()
1471
1472 manager2 = QueueManager2(
1473 address=manager.address, authkey=authkey, serializer=SERIALIZER
1474 )
1475 manager2.connect()
1476 queue = manager2.get_queue()
1477
1478 # Note that xmlrpclib will deserialize object as a list not a tuple
1479 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1480
1481 # Because we are using xmlrpclib for serialization instead of
1482 # pickle this will cause a serialization error.
1483 self.assertRaises(Exception, queue.put, time.sleep)
1484
1485 # Make queue finalizer run before the server is stopped
1486 del queue
1487 manager.shutdown()
1488
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001489class _TestManagerRestart(BaseTestCase):
1490
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001491 @classmethod
1492 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001493 manager = QueueManager(
1494 address=address, authkey=authkey, serializer=SERIALIZER)
1495 manager.connect()
1496 queue = manager.get_queue()
1497 queue.put('hello world')
1498
1499 def test_rapid_restart(self):
1500 authkey = os.urandom(32)
1501 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001502 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001503 srvr = manager.get_server()
1504 addr = srvr.address
1505 # Close the connection.Listener socket which gets opened as a part
1506 # of manager.get_server(). It's not needed for the test.
1507 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001508 manager.start()
1509
1510 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001511 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001512 p.start()
1513 queue = manager.get_queue()
1514 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001515 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001516 manager.shutdown()
1517 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001518 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001519 try:
1520 manager.start()
1521 except IOError as e:
1522 if e.errno != errno.EADDRINUSE:
1523 raise
1524 # Retry after some time, in case the old socket was lingering
1525 # (sporadic failure on buildbots)
1526 time.sleep(1.0)
1527 manager = QueueManager(
1528 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001529 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001530
Benjamin Petersone711caf2008-06-11 16:44:04 +00001531#
1532#
1533#
1534
1535SENTINEL = latin('')
1536
1537class _TestConnection(BaseTestCase):
1538
1539 ALLOWED_TYPES = ('processes', 'threads')
1540
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001541 @classmethod
1542 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001543 for msg in iter(conn.recv_bytes, SENTINEL):
1544 conn.send_bytes(msg)
1545 conn.close()
1546
1547 def test_connection(self):
1548 conn, child_conn = self.Pipe()
1549
1550 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001551 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001552 p.start()
1553
1554 seq = [1, 2.25, None]
1555 msg = latin('hello world')
1556 longmsg = msg * 10
1557 arr = array.array('i', list(range(4)))
1558
1559 if self.TYPE == 'processes':
1560 self.assertEqual(type(conn.fileno()), int)
1561
1562 self.assertEqual(conn.send(seq), None)
1563 self.assertEqual(conn.recv(), seq)
1564
1565 self.assertEqual(conn.send_bytes(msg), None)
1566 self.assertEqual(conn.recv_bytes(), msg)
1567
1568 if self.TYPE == 'processes':
1569 buffer = array.array('i', [0]*10)
1570 expected = list(arr) + [0] * (10 - len(arr))
1571 self.assertEqual(conn.send_bytes(arr), None)
1572 self.assertEqual(conn.recv_bytes_into(buffer),
1573 len(arr) * buffer.itemsize)
1574 self.assertEqual(list(buffer), expected)
1575
1576 buffer = array.array('i', [0]*10)
1577 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1578 self.assertEqual(conn.send_bytes(arr), None)
1579 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1580 len(arr) * buffer.itemsize)
1581 self.assertEqual(list(buffer), expected)
1582
1583 buffer = bytearray(latin(' ' * 40))
1584 self.assertEqual(conn.send_bytes(longmsg), None)
1585 try:
1586 res = conn.recv_bytes_into(buffer)
1587 except multiprocessing.BufferTooShort as e:
1588 self.assertEqual(e.args, (longmsg,))
1589 else:
1590 self.fail('expected BufferTooShort, got %s' % res)
1591
1592 poll = TimingWrapper(conn.poll)
1593
1594 self.assertEqual(poll(), False)
1595 self.assertTimingAlmostEqual(poll.elapsed, 0)
1596
1597 self.assertEqual(poll(TIMEOUT1), False)
1598 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1599
1600 conn.send(None)
1601
1602 self.assertEqual(poll(TIMEOUT1), True)
1603 self.assertTimingAlmostEqual(poll.elapsed, 0)
1604
1605 self.assertEqual(conn.recv(), None)
1606
1607 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1608 conn.send_bytes(really_big_msg)
1609 self.assertEqual(conn.recv_bytes(), really_big_msg)
1610
1611 conn.send_bytes(SENTINEL) # tell child to quit
1612 child_conn.close()
1613
1614 if self.TYPE == 'processes':
1615 self.assertEqual(conn.readable, True)
1616 self.assertEqual(conn.writable, True)
1617 self.assertRaises(EOFError, conn.recv)
1618 self.assertRaises(EOFError, conn.recv_bytes)
1619
1620 p.join()
1621
1622 def test_duplex_false(self):
1623 reader, writer = self.Pipe(duplex=False)
1624 self.assertEqual(writer.send(1), None)
1625 self.assertEqual(reader.recv(), 1)
1626 if self.TYPE == 'processes':
1627 self.assertEqual(reader.readable, True)
1628 self.assertEqual(reader.writable, False)
1629 self.assertEqual(writer.readable, False)
1630 self.assertEqual(writer.writable, True)
1631 self.assertRaises(IOError, reader.send, 2)
1632 self.assertRaises(IOError, writer.recv)
1633 self.assertRaises(IOError, writer.poll)
1634
1635 def test_spawn_close(self):
1636 # We test that a pipe connection can be closed by parent
1637 # process immediately after child is spawned. On Windows this
1638 # would have sometimes failed on old versions because
1639 # child_conn would be closed before the child got a chance to
1640 # duplicate it.
1641 conn, child_conn = self.Pipe()
1642
1643 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001644 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001645 p.start()
1646 child_conn.close() # this might complete before child initializes
1647
1648 msg = latin('hello')
1649 conn.send_bytes(msg)
1650 self.assertEqual(conn.recv_bytes(), msg)
1651
1652 conn.send_bytes(SENTINEL)
1653 conn.close()
1654 p.join()
1655
1656 def test_sendbytes(self):
1657 if self.TYPE != 'processes':
1658 return
1659
1660 msg = latin('abcdefghijklmnopqrstuvwxyz')
1661 a, b = self.Pipe()
1662
1663 a.send_bytes(msg)
1664 self.assertEqual(b.recv_bytes(), msg)
1665
1666 a.send_bytes(msg, 5)
1667 self.assertEqual(b.recv_bytes(), msg[5:])
1668
1669 a.send_bytes(msg, 7, 8)
1670 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1671
1672 a.send_bytes(msg, 26)
1673 self.assertEqual(b.recv_bytes(), latin(''))
1674
1675 a.send_bytes(msg, 26, 0)
1676 self.assertEqual(b.recv_bytes(), latin(''))
1677
1678 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1679
1680 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1681
1682 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1683
1684 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1685
1686 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1687
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001688 @classmethod
1689 def _is_fd_assigned(cls, fd):
1690 try:
1691 os.fstat(fd)
1692 except OSError as e:
1693 if e.errno == errno.EBADF:
1694 return False
1695 raise
1696 else:
1697 return True
1698
1699 @classmethod
1700 def _writefd(cls, conn, data, create_dummy_fds=False):
1701 if create_dummy_fds:
1702 for i in range(0, 256):
1703 if not cls._is_fd_assigned(i):
1704 os.dup2(conn.fileno(), i)
1705 fd = reduction.recv_handle(conn)
1706 if msvcrt:
1707 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1708 os.write(fd, data)
1709 os.close(fd)
1710
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001711 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001712 def test_fd_transfer(self):
1713 if self.TYPE != 'processes':
1714 self.skipTest("only makes sense with processes")
1715 conn, child_conn = self.Pipe(duplex=True)
1716
1717 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001718 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001719 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001720 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001721 with open(test.support.TESTFN, "wb") as f:
1722 fd = f.fileno()
1723 if msvcrt:
1724 fd = msvcrt.get_osfhandle(fd)
1725 reduction.send_handle(conn, fd, p.pid)
1726 p.join()
1727 with open(test.support.TESTFN, "rb") as f:
1728 self.assertEqual(f.read(), b"foo")
1729
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001730 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001731 @unittest.skipIf(sys.platform == "win32",
1732 "test semantics don't make sense on Windows")
1733 @unittest.skipIf(MAXFD <= 256,
1734 "largest assignable fd number is too small")
1735 @unittest.skipUnless(hasattr(os, "dup2"),
1736 "test needs os.dup2()")
1737 def test_large_fd_transfer(self):
1738 # With fd > 256 (issue #11657)
1739 if self.TYPE != 'processes':
1740 self.skipTest("only makes sense with processes")
1741 conn, child_conn = self.Pipe(duplex=True)
1742
1743 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001744 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001745 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001746 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001747 with open(test.support.TESTFN, "wb") as f:
1748 fd = f.fileno()
1749 for newfd in range(256, MAXFD):
1750 if not self._is_fd_assigned(newfd):
1751 break
1752 else:
1753 self.fail("could not find an unassigned large file descriptor")
1754 os.dup2(fd, newfd)
1755 try:
1756 reduction.send_handle(conn, newfd, p.pid)
1757 finally:
1758 os.close(newfd)
1759 p.join()
1760 with open(test.support.TESTFN, "rb") as f:
1761 self.assertEqual(f.read(), b"bar")
1762
Jesus Cea4507e642011-09-21 03:53:25 +02001763 @classmethod
1764 def _send_data_without_fd(self, conn):
1765 os.write(conn.fileno(), b"\0")
1766
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001767 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001768 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1769 def test_missing_fd_transfer(self):
1770 # Check that exception is raised when received data is not
1771 # accompanied by a file descriptor in ancillary data.
1772 if self.TYPE != 'processes':
1773 self.skipTest("only makes sense with processes")
1774 conn, child_conn = self.Pipe(duplex=True)
1775
1776 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1777 p.daemon = True
1778 p.start()
1779 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1780 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001781
Benjamin Petersone711caf2008-06-11 16:44:04 +00001782class _TestListenerClient(BaseTestCase):
1783
1784 ALLOWED_TYPES = ('processes', 'threads')
1785
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001786 @classmethod
1787 def _test(cls, address):
1788 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001789 conn.send('hello')
1790 conn.close()
1791
1792 def test_listener_client(self):
1793 for family in self.connection.families:
1794 l = self.connection.Listener(family=family)
1795 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001796 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001797 p.start()
1798 conn = l.accept()
1799 self.assertEqual(conn.recv(), 'hello')
1800 p.join()
1801 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001802#
1803# Test of sending connection and socket objects between processes
1804#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001805"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001806class _TestPicklingConnections(BaseTestCase):
1807
1808 ALLOWED_TYPES = ('processes',)
1809
1810 def _listener(self, conn, families):
1811 for fam in families:
1812 l = self.connection.Listener(family=fam)
1813 conn.send(l.address)
1814 new_conn = l.accept()
1815 conn.send(new_conn)
1816
1817 if self.TYPE == 'processes':
1818 l = socket.socket()
1819 l.bind(('localhost', 0))
1820 conn.send(l.getsockname())
1821 l.listen(1)
1822 new_conn, addr = l.accept()
1823 conn.send(new_conn)
1824
1825 conn.recv()
1826
1827 def _remote(self, conn):
1828 for (address, msg) in iter(conn.recv, None):
1829 client = self.connection.Client(address)
1830 client.send(msg.upper())
1831 client.close()
1832
1833 if self.TYPE == 'processes':
1834 address, msg = conn.recv()
1835 client = socket.socket()
1836 client.connect(address)
1837 client.sendall(msg.upper())
1838 client.close()
1839
1840 conn.close()
1841
1842 def test_pickling(self):
1843 try:
1844 multiprocessing.allow_connection_pickling()
1845 except ImportError:
1846 return
1847
1848 families = self.connection.families
1849
1850 lconn, lconn0 = self.Pipe()
1851 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001852 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001853 lp.start()
1854 lconn0.close()
1855
1856 rconn, rconn0 = self.Pipe()
1857 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001858 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001859 rp.start()
1860 rconn0.close()
1861
1862 for fam in families:
1863 msg = ('This connection uses family %s' % fam).encode('ascii')
1864 address = lconn.recv()
1865 rconn.send((address, msg))
1866 new_conn = lconn.recv()
1867 self.assertEqual(new_conn.recv(), msg.upper())
1868
1869 rconn.send(None)
1870
1871 if self.TYPE == 'processes':
1872 msg = latin('This connection uses a normal socket')
1873 address = lconn.recv()
1874 rconn.send((address, msg))
1875 if hasattr(socket, 'fromfd'):
1876 new_conn = lconn.recv()
1877 self.assertEqual(new_conn.recv(100), msg.upper())
1878 else:
1879 # XXX On Windows with Py2.6 need to backport fromfd()
1880 discard = lconn.recv_bytes()
1881
1882 lconn.send(None)
1883
1884 rconn.close()
1885 lconn.close()
1886
1887 lp.join()
1888 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001889"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001890#
1891#
1892#
1893
1894class _TestHeap(BaseTestCase):
1895
1896 ALLOWED_TYPES = ('processes',)
1897
1898 def test_heap(self):
1899 iterations = 5000
1900 maxblocks = 50
1901 blocks = []
1902
1903 # create and destroy lots of blocks of different sizes
1904 for i in range(iterations):
1905 size = int(random.lognormvariate(0, 1) * 1000)
1906 b = multiprocessing.heap.BufferWrapper(size)
1907 blocks.append(b)
1908 if len(blocks) > maxblocks:
1909 i = random.randrange(maxblocks)
1910 del blocks[i]
1911
1912 # get the heap object
1913 heap = multiprocessing.heap.BufferWrapper._heap
1914
1915 # verify the state of the heap
1916 all = []
1917 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001918 heap._lock.acquire()
1919 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001920 for L in list(heap._len_to_seq.values()):
1921 for arena, start, stop in L:
1922 all.append((heap._arenas.index(arena), start, stop,
1923 stop-start, 'free'))
1924 for arena, start, stop in heap._allocated_blocks:
1925 all.append((heap._arenas.index(arena), start, stop,
1926 stop-start, 'occupied'))
1927 occupied += (stop-start)
1928
1929 all.sort()
1930
1931 for i in range(len(all)-1):
1932 (arena, start, stop) = all[i][:3]
1933 (narena, nstart, nstop) = all[i+1][:3]
1934 self.assertTrue((arena != narena and nstart == 0) or
1935 (stop == nstart))
1936
Charles-François Natali778db492011-07-02 14:35:49 +02001937 def test_free_from_gc(self):
1938 # Check that freeing of blocks by the garbage collector doesn't deadlock
1939 # (issue #12352).
1940 # Make sure the GC is enabled, and set lower collection thresholds to
1941 # make collections more frequent (and increase the probability of
1942 # deadlock).
1943 if not gc.isenabled():
1944 gc.enable()
1945 self.addCleanup(gc.disable)
1946 thresholds = gc.get_threshold()
1947 self.addCleanup(gc.set_threshold, *thresholds)
1948 gc.set_threshold(10)
1949
1950 # perform numerous block allocations, with cyclic references to make
1951 # sure objects are collected asynchronously by the gc
1952 for i in range(5000):
1953 a = multiprocessing.heap.BufferWrapper(1)
1954 b = multiprocessing.heap.BufferWrapper(1)
1955 # circular references
1956 a.buddy = b
1957 b.buddy = a
1958
Benjamin Petersone711caf2008-06-11 16:44:04 +00001959#
1960#
1961#
1962
Benjamin Petersone711caf2008-06-11 16:44:04 +00001963class _Foo(Structure):
1964 _fields_ = [
1965 ('x', c_int),
1966 ('y', c_double)
1967 ]
1968
1969class _TestSharedCTypes(BaseTestCase):
1970
1971 ALLOWED_TYPES = ('processes',)
1972
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001973 def setUp(self):
1974 if not HAS_SHAREDCTYPES:
1975 self.skipTest("requires multiprocessing.sharedctypes")
1976
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001977 @classmethod
1978 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001979 x.value *= 2
1980 y.value *= 2
1981 foo.x *= 2
1982 foo.y *= 2
1983 string.value *= 2
1984 for i in range(len(arr)):
1985 arr[i] *= 2
1986
1987 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001988 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001989 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001990 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001991 arr = self.Array('d', list(range(10)), lock=lock)
1992 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001993 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001994
1995 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001996 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001997 p.start()
1998 p.join()
1999
2000 self.assertEqual(x.value, 14)
2001 self.assertAlmostEqual(y.value, 2.0/3.0)
2002 self.assertEqual(foo.x, 6)
2003 self.assertAlmostEqual(foo.y, 4.0)
2004 for i in range(10):
2005 self.assertAlmostEqual(arr[i], i*2)
2006 self.assertEqual(string.value, latin('hellohello'))
2007
2008 def test_synchronize(self):
2009 self.test_sharedctypes(lock=True)
2010
2011 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002012 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002013 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002014 foo.x = 0
2015 foo.y = 0
2016 self.assertEqual(bar.x, 2)
2017 self.assertAlmostEqual(bar.y, 5.0)
2018
2019#
2020#
2021#
2022
2023class _TestFinalize(BaseTestCase):
2024
2025 ALLOWED_TYPES = ('processes',)
2026
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002027 @classmethod
2028 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002029 class Foo(object):
2030 pass
2031
2032 a = Foo()
2033 util.Finalize(a, conn.send, args=('a',))
2034 del a # triggers callback for a
2035
2036 b = Foo()
2037 close_b = util.Finalize(b, conn.send, args=('b',))
2038 close_b() # triggers callback for b
2039 close_b() # does nothing because callback has already been called
2040 del b # does nothing because callback has already been called
2041
2042 c = Foo()
2043 util.Finalize(c, conn.send, args=('c',))
2044
2045 d10 = Foo()
2046 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2047
2048 d01 = Foo()
2049 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2050 d02 = Foo()
2051 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2052 d03 = Foo()
2053 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2054
2055 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2056
2057 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2058
Ezio Melotti13925002011-03-16 11:05:33 +02002059 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002060 # garbage collecting locals
2061 util._exit_function()
2062 conn.close()
2063 os._exit(0)
2064
2065 def test_finalize(self):
2066 conn, child_conn = self.Pipe()
2067
2068 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002069 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002070 p.start()
2071 p.join()
2072
2073 result = [obj for obj in iter(conn.recv, 'STOP')]
2074 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2075
2076#
2077# Test that from ... import * works for each module
2078#
2079
2080class _TestImportStar(BaseTestCase):
2081
2082 ALLOWED_TYPES = ('processes',)
2083
2084 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002085 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002086 'multiprocessing', 'multiprocessing.connection',
2087 'multiprocessing.heap', 'multiprocessing.managers',
2088 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002089 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002090 ]
2091
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002092 if HAS_REDUCTION:
2093 modules.append('multiprocessing.reduction')
2094
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002095 if c_int is not None:
2096 # This module requires _ctypes
2097 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002098
2099 for name in modules:
2100 __import__(name)
2101 mod = sys.modules[name]
2102
2103 for attr in getattr(mod, '__all__', ()):
2104 self.assertTrue(
2105 hasattr(mod, attr),
2106 '%r does not have attribute %r' % (mod, attr)
2107 )
2108
2109#
2110# Quick test that logging works -- does not test logging output
2111#
2112
2113class _TestLogging(BaseTestCase):
2114
2115 ALLOWED_TYPES = ('processes',)
2116
2117 def test_enable_logging(self):
2118 logger = multiprocessing.get_logger()
2119 logger.setLevel(util.SUBWARNING)
2120 self.assertTrue(logger is not None)
2121 logger.debug('this will not be printed')
2122 logger.info('nor will this')
2123 logger.setLevel(LOG_LEVEL)
2124
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002125 @classmethod
2126 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002127 logger = multiprocessing.get_logger()
2128 conn.send(logger.getEffectiveLevel())
2129
2130 def test_level(self):
2131 LEVEL1 = 32
2132 LEVEL2 = 37
2133
2134 logger = multiprocessing.get_logger()
2135 root_logger = logging.getLogger()
2136 root_level = root_logger.level
2137
2138 reader, writer = multiprocessing.Pipe(duplex=False)
2139
2140 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002141 p = self.Process(target=self._test_level, args=(writer,))
2142 p.daemon = True
2143 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002144 self.assertEqual(LEVEL1, reader.recv())
2145
2146 logger.setLevel(logging.NOTSET)
2147 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002148 p = self.Process(target=self._test_level, args=(writer,))
2149 p.daemon = True
2150 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002151 self.assertEqual(LEVEL2, reader.recv())
2152
2153 root_logger.setLevel(root_level)
2154 logger.setLevel(level=LOG_LEVEL)
2155
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002156
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002157# class _TestLoggingProcessName(BaseTestCase):
2158#
2159# def handle(self, record):
2160# assert record.processName == multiprocessing.current_process().name
2161# self.__handled = True
2162#
2163# def test_logging(self):
2164# handler = logging.Handler()
2165# handler.handle = self.handle
2166# self.__handled = False
2167# # Bypass getLogger() and side-effects
2168# logger = logging.getLoggerClass()(
2169# 'multiprocessing.test.TestLoggingProcessName')
2170# logger.addHandler(handler)
2171# logger.propagate = False
2172#
2173# logger.warn('foo')
2174# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002175
Benjamin Petersone711caf2008-06-11 16:44:04 +00002176#
Jesse Noller6214edd2009-01-19 16:23:53 +00002177# Test to verify handle verification, see issue 3321
2178#
2179
2180class TestInvalidHandle(unittest.TestCase):
2181
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002182 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002183 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002184 conn = multiprocessing.connection.Connection(44977608)
2185 try:
2186 self.assertRaises((ValueError, IOError), conn.poll)
2187 finally:
2188 # Hack private attribute _handle to avoid printing an error
2189 # in conn.__del__
2190 conn._handle = None
2191 self.assertRaises((ValueError, IOError),
2192 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002193
Jesse Noller6214edd2009-01-19 16:23:53 +00002194#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002195# Functions used to create test cases from the base ones in this module
2196#
2197
2198def get_attributes(Source, names):
2199 d = {}
2200 for name in names:
2201 obj = getattr(Source, name)
2202 if type(obj) == type(get_attributes):
2203 obj = staticmethod(obj)
2204 d[name] = obj
2205 return d
2206
2207def create_test_cases(Mixin, type):
2208 result = {}
2209 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002210 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002211
2212 for name in list(glob.keys()):
2213 if name.startswith('_Test'):
2214 base = glob[name]
2215 if type in base.ALLOWED_TYPES:
2216 newname = 'With' + Type + name[1:]
2217 class Temp(base, unittest.TestCase, Mixin):
2218 pass
2219 result[newname] = Temp
2220 Temp.__name__ = newname
2221 Temp.__module__ = Mixin.__module__
2222 return result
2223
2224#
2225# Create test cases
2226#
2227
2228class ProcessesMixin(object):
2229 TYPE = 'processes'
2230 Process = multiprocessing.Process
2231 locals().update(get_attributes(multiprocessing, (
2232 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2233 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2234 'RawArray', 'current_process', 'active_children', 'Pipe',
2235 'connection', 'JoinableQueue'
2236 )))
2237
2238testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2239globals().update(testcases_processes)
2240
2241
2242class ManagerMixin(object):
2243 TYPE = 'manager'
2244 Process = multiprocessing.Process
2245 manager = object.__new__(multiprocessing.managers.SyncManager)
2246 locals().update(get_attributes(manager, (
2247 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2248 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2249 'Namespace', 'JoinableQueue'
2250 )))
2251
2252testcases_manager = create_test_cases(ManagerMixin, type='manager')
2253globals().update(testcases_manager)
2254
2255
2256class ThreadsMixin(object):
2257 TYPE = 'threads'
2258 Process = multiprocessing.dummy.Process
2259 locals().update(get_attributes(multiprocessing.dummy, (
2260 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2261 'Condition', 'Event', 'Value', 'Array', 'current_process',
2262 'active_children', 'Pipe', 'connection', 'dict', 'list',
2263 'Namespace', 'JoinableQueue'
2264 )))
2265
2266testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2267globals().update(testcases_threads)
2268
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002269class OtherTest(unittest.TestCase):
2270 # TODO: add more tests for deliver/answer challenge.
2271 def test_deliver_challenge_auth_failure(self):
2272 class _FakeConnection(object):
2273 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002274 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002275 def send_bytes(self, data):
2276 pass
2277 self.assertRaises(multiprocessing.AuthenticationError,
2278 multiprocessing.connection.deliver_challenge,
2279 _FakeConnection(), b'abc')
2280
2281 def test_answer_challenge_auth_failure(self):
2282 class _FakeConnection(object):
2283 def __init__(self):
2284 self.count = 0
2285 def recv_bytes(self, size):
2286 self.count += 1
2287 if self.count == 1:
2288 return multiprocessing.connection.CHALLENGE
2289 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002290 return b'something bogus'
2291 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002292 def send_bytes(self, data):
2293 pass
2294 self.assertRaises(multiprocessing.AuthenticationError,
2295 multiprocessing.connection.answer_challenge,
2296 _FakeConnection(), b'abc')
2297
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002298#
2299# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2300#
2301
2302def initializer(ns):
2303 ns.test += 1
2304
2305class TestInitializers(unittest.TestCase):
2306 def setUp(self):
2307 self.mgr = multiprocessing.Manager()
2308 self.ns = self.mgr.Namespace()
2309 self.ns.test = 0
2310
2311 def tearDown(self):
2312 self.mgr.shutdown()
2313
2314 def test_manager_initializer(self):
2315 m = multiprocessing.managers.SyncManager()
2316 self.assertRaises(TypeError, m.start, 1)
2317 m.start(initializer, (self.ns,))
2318 self.assertEqual(self.ns.test, 1)
2319 m.shutdown()
2320
2321 def test_pool_initializer(self):
2322 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2323 p = multiprocessing.Pool(1, initializer, (self.ns,))
2324 p.close()
2325 p.join()
2326 self.assertEqual(self.ns.test, 1)
2327
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002328#
2329# Issue 5155, 5313, 5331: Test process in processes
2330# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2331#
2332
2333def _ThisSubProcess(q):
2334 try:
2335 item = q.get(block=False)
2336 except pyqueue.Empty:
2337 pass
2338
2339def _TestProcess(q):
2340 queue = multiprocessing.Queue()
2341 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002342 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002343 subProc.start()
2344 subProc.join()
2345
2346def _afunc(x):
2347 return x*x
2348
2349def pool_in_process():
2350 pool = multiprocessing.Pool(processes=4)
2351 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2352
2353class _file_like(object):
2354 def __init__(self, delegate):
2355 self._delegate = delegate
2356 self._pid = None
2357
2358 @property
2359 def cache(self):
2360 pid = os.getpid()
2361 # There are no race conditions since fork keeps only the running thread
2362 if pid != self._pid:
2363 self._pid = pid
2364 self._cache = []
2365 return self._cache
2366
2367 def write(self, data):
2368 self.cache.append(data)
2369
2370 def flush(self):
2371 self._delegate.write(''.join(self.cache))
2372 self._cache = []
2373
2374class TestStdinBadfiledescriptor(unittest.TestCase):
2375
2376 def test_queue_in_process(self):
2377 queue = multiprocessing.Queue()
2378 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2379 proc.start()
2380 proc.join()
2381
2382 def test_pool_in_process(self):
2383 p = multiprocessing.Process(target=pool_in_process)
2384 p.start()
2385 p.join()
2386
2387 def test_flushing(self):
2388 sio = io.StringIO()
2389 flike = _file_like(sio)
2390 flike.write('foo')
2391 proc = multiprocessing.Process(target=lambda: flike.flush())
2392 flike.flush()
2393 assert sio.getvalue() == 'foo'
2394
2395testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2396 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002397
Benjamin Petersone711caf2008-06-11 16:44:04 +00002398#
2399#
2400#
2401
2402def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002403 if sys.platform.startswith("linux"):
2404 try:
2405 lock = multiprocessing.RLock()
2406 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002407 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002408
Charles-François Natali221ef672011-11-22 18:55:22 +01002409 check_enough_semaphores()
2410
Benjamin Petersone711caf2008-06-11 16:44:04 +00002411 if run is None:
2412 from test.support import run_unittest as run
2413
2414 util.get_temp_dir() # creates temp directory for use by all processes
2415
2416 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2417
Benjamin Peterson41181742008-07-02 20:22:54 +00002418 ProcessesMixin.pool = multiprocessing.Pool(4)
2419 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2420 ManagerMixin.manager.__init__()
2421 ManagerMixin.manager.start()
2422 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002423
2424 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002425 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2426 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002427 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2428 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002429 )
2430
2431 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2432 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2433 run(suite)
2434
Benjamin Peterson41181742008-07-02 20:22:54 +00002435 ThreadsMixin.pool.terminate()
2436 ProcessesMixin.pool.terminate()
2437 ManagerMixin.pool.terminate()
2438 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002439
Benjamin Peterson41181742008-07-02 20:22:54 +00002440 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002441
2442def main():
2443 test_main(unittest.TextTestRunner(verbosity=2).run)
2444
2445if __name__ == '__main__':
2446 main()