blob: 93cc11d96d416cdc50e104fdfe680105cd918583 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086if WIN32:
87 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
88
89 def wait_for_handle(handle, timeout):
90 if timeout is None or timeout < 0.0:
91 timeout = INFINITE
92 else:
93 timeout = int(1000 * timeout)
94 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
95else:
96 from select import select
97 _select = util._eintr_retry(select)
98
99 def wait_for_handle(handle, timeout):
100 if timeout is not None and timeout < 0.0:
101 timeout = None
102 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000103
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200104try:
105 MAXFD = os.sysconf("SC_OPEN_MAX")
106except:
107 MAXFD = 256
108
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000110# Some tests require ctypes
111#
112
113try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000114 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000115except ImportError:
116 Structure = object
117 c_int = c_double = None
118
Charles-François Natali221ef672011-11-22 18:55:22 +0100119
120def check_enough_semaphores():
121 """Check that the system supports enough semaphores to run the test."""
122 # minimum number of semaphores available according to POSIX
123 nsems_min = 256
124 try:
125 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
126 except (AttributeError, ValueError):
127 # sysconf not available or setting not available
128 return
129 if nsems == -1 or nsems >= nsems_min:
130 return
131 raise unittest.SkipTest("The OS doesn't support enough semaphores "
132 "to run the test (required: %d)." % nsems_min)
133
134
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000135#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136# Creates a wrapper for a function which records the time it takes to finish
137#
138
139class TimingWrapper(object):
140
141 def __init__(self, func):
142 self.func = func
143 self.elapsed = None
144
145 def __call__(self, *args, **kwds):
146 t = time.time()
147 try:
148 return self.func(*args, **kwds)
149 finally:
150 self.elapsed = time.time() - t
151
152#
153# Base class for test cases
154#
155
156class BaseTestCase(object):
157
158 ALLOWED_TYPES = ('processes', 'manager', 'threads')
159
160 def assertTimingAlmostEqual(self, a, b):
161 if CHECK_TIMINGS:
162 self.assertAlmostEqual(a, b, 1)
163
164 def assertReturnsIfImplemented(self, value, func, *args):
165 try:
166 res = func(*args)
167 except NotImplementedError:
168 pass
169 else:
170 return self.assertEqual(value, res)
171
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 # For the sanity of Windows users, rather than crashing or freezing in
173 # multiple ways.
174 def __reduce__(self, *args):
175 raise NotImplementedError("shouldn't try to pickle a test case")
176
177 __reduce_ex__ = __reduce__
178
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179#
180# Return the value of a semaphore
181#
182
183def get_value(self):
184 try:
185 return self.get_value()
186 except AttributeError:
187 try:
188 return self._Semaphore__value
189 except AttributeError:
190 try:
191 return self._value
192 except AttributeError:
193 raise NotImplementedError
194
195#
196# Testcases
197#
198
199class _TestProcess(BaseTestCase):
200
201 ALLOWED_TYPES = ('processes', 'threads')
202
203 def test_current(self):
204 if self.TYPE == 'threads':
205 return
206
207 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000212 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 self.assertEqual(current.ident, os.getpid())
215 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 def test_daemon_argument(self):
218 if self.TYPE == "threads":
219 return
220
221 # By default uses the current process's daemon flag.
222 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000223 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000224 proc1 = self.Process(target=self._test, daemon=True)
225 self.assertTrue(proc1.daemon)
226 proc2 = self.Process(target=self._test, daemon=False)
227 self.assertFalse(proc2.daemon)
228
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 @classmethod
230 def _test(cls, q, *args, **kwds):
231 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 q.put(args)
233 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000235 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000236 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 q.put(current.pid)
238
239 def test_process(self):
240 q = self.Queue(1)
241 e = self.Event()
242 args = (q, 1, 2)
243 kwargs = {'hello':23, 'bye':2.54}
244 name = 'SomeProcess'
245 p = self.Process(
246 target=self._test, args=args, kwargs=kwargs, name=name
247 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 current = self.current_process()
250
251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.authkey, current.authkey)
253 self.assertEqual(p.is_alive(), False)
254 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
260
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(p.exitcode, None)
262 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(q.get(), args[1:])
266 self.assertEqual(q.get(), kwargs)
267 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(q.get(), current.authkey)
270 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
272 p.join()
273
Ezio Melottib3aedd42010-11-20 19:04:17 +0000274 self.assertEqual(p.exitcode, 0)
275 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000276 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000278 @classmethod
279 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 time.sleep(1000)
281
282 def test_terminate(self):
283 if self.TYPE == 'threads':
284 return
285
286 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000287 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288 p.start()
289
290 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000291 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000292 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293
294 p.terminate()
295
296 join = TimingWrapper(p.join)
297 self.assertEqual(join(), None)
298 self.assertTimingAlmostEqual(join.elapsed, 0.0)
299
300 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
304
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000305 # XXX sometimes get p.exitcode == 0 on Windows ...
306 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307
308 def test_cpu_count(self):
309 try:
310 cpus = multiprocessing.cpu_count()
311 except NotImplementedError:
312 cpus = 1
313 self.assertTrue(type(cpus) is int)
314 self.assertTrue(cpus >= 1)
315
316 def test_active_children(self):
317 self.assertEqual(type(self.active_children()), list)
318
319 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000320 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
Jesus Cea94f964f2011-09-09 20:26:57 +0200322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000324 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000327 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000329 @classmethod
330 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 from multiprocessing import forking
332 wconn.send(id)
333 if len(id) < 2:
334 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000335 p = cls.Process(
336 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 )
338 p.start()
339 p.join()
340
341 def test_recursion(self):
342 rconn, wconn = self.Pipe(duplex=False)
343 self._test_recursion(wconn, [])
344
345 time.sleep(DELTA)
346 result = []
347 while rconn.poll():
348 result.append(rconn.recv())
349
350 expected = [
351 [],
352 [0],
353 [0, 0],
354 [0, 1],
355 [1],
356 [1, 0],
357 [1, 1]
358 ]
359 self.assertEqual(result, expected)
360
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200361 @classmethod
362 def _test_sentinel(cls, event):
363 event.wait(10.0)
364
365 def test_sentinel(self):
366 if self.TYPE == "threads":
367 return
368 event = self.Event()
369 p = self.Process(target=self._test_sentinel, args=(event,))
370 with self.assertRaises(ValueError):
371 p.sentinel
372 p.start()
373 self.addCleanup(p.join)
374 sentinel = p.sentinel
375 self.assertIsInstance(sentinel, int)
376 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
377 event.set()
378 p.join()
379 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
380
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381#
382#
383#
384
385class _UpperCaser(multiprocessing.Process):
386
387 def __init__(self):
388 multiprocessing.Process.__init__(self)
389 self.child_conn, self.parent_conn = multiprocessing.Pipe()
390
391 def run(self):
392 self.parent_conn.close()
393 for s in iter(self.child_conn.recv, None):
394 self.child_conn.send(s.upper())
395 self.child_conn.close()
396
397 def submit(self, s):
398 assert type(s) is str
399 self.parent_conn.send(s)
400 return self.parent_conn.recv()
401
402 def stop(self):
403 self.parent_conn.send(None)
404 self.parent_conn.close()
405 self.child_conn.close()
406
407class _TestSubclassingProcess(BaseTestCase):
408
409 ALLOWED_TYPES = ('processes',)
410
411 def test_subclassing(self):
412 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200413 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 uppercaser.start()
415 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
416 self.assertEqual(uppercaser.submit('world'), 'WORLD')
417 uppercaser.stop()
418 uppercaser.join()
419
420#
421#
422#
423
424def queue_empty(q):
425 if hasattr(q, 'empty'):
426 return q.empty()
427 else:
428 return q.qsize() == 0
429
430def queue_full(q, maxsize):
431 if hasattr(q, 'full'):
432 return q.full()
433 else:
434 return q.qsize() == maxsize
435
436
437class _TestQueue(BaseTestCase):
438
439
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000440 @classmethod
441 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000442 child_can_start.wait()
443 for i in range(6):
444 queue.get()
445 parent_can_continue.set()
446
447 def test_put(self):
448 MAXSIZE = 6
449 queue = self.Queue(maxsize=MAXSIZE)
450 child_can_start = self.Event()
451 parent_can_continue = self.Event()
452
453 proc = self.Process(
454 target=self._test_put,
455 args=(queue, child_can_start, parent_can_continue)
456 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000457 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000458 proc.start()
459
460 self.assertEqual(queue_empty(queue), True)
461 self.assertEqual(queue_full(queue, MAXSIZE), False)
462
463 queue.put(1)
464 queue.put(2, True)
465 queue.put(3, True, None)
466 queue.put(4, False)
467 queue.put(5, False, None)
468 queue.put_nowait(6)
469
470 # the values may be in buffer but not yet in pipe so sleep a bit
471 time.sleep(DELTA)
472
473 self.assertEqual(queue_empty(queue), False)
474 self.assertEqual(queue_full(queue, MAXSIZE), True)
475
476 put = TimingWrapper(queue.put)
477 put_nowait = TimingWrapper(queue.put_nowait)
478
479 self.assertRaises(pyqueue.Full, put, 7, False)
480 self.assertTimingAlmostEqual(put.elapsed, 0)
481
482 self.assertRaises(pyqueue.Full, put, 7, False, None)
483 self.assertTimingAlmostEqual(put.elapsed, 0)
484
485 self.assertRaises(pyqueue.Full, put_nowait, 7)
486 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
487
488 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
489 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
490
491 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
492 self.assertTimingAlmostEqual(put.elapsed, 0)
493
494 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
495 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
496
497 child_can_start.set()
498 parent_can_continue.wait()
499
500 self.assertEqual(queue_empty(queue), True)
501 self.assertEqual(queue_full(queue, MAXSIZE), False)
502
503 proc.join()
504
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000505 @classmethod
506 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000507 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000508 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000509 queue.put(2)
510 queue.put(3)
511 queue.put(4)
512 queue.put(5)
513 parent_can_continue.set()
514
515 def test_get(self):
516 queue = self.Queue()
517 child_can_start = self.Event()
518 parent_can_continue = self.Event()
519
520 proc = self.Process(
521 target=self._test_get,
522 args=(queue, child_can_start, parent_can_continue)
523 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000524 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000525 proc.start()
526
527 self.assertEqual(queue_empty(queue), True)
528
529 child_can_start.set()
530 parent_can_continue.wait()
531
532 time.sleep(DELTA)
533 self.assertEqual(queue_empty(queue), False)
534
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000535 # Hangs unexpectedly, remove for now
536 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000537 self.assertEqual(queue.get(True, None), 2)
538 self.assertEqual(queue.get(True), 3)
539 self.assertEqual(queue.get(timeout=1), 4)
540 self.assertEqual(queue.get_nowait(), 5)
541
542 self.assertEqual(queue_empty(queue), True)
543
544 get = TimingWrapper(queue.get)
545 get_nowait = TimingWrapper(queue.get_nowait)
546
547 self.assertRaises(pyqueue.Empty, get, False)
548 self.assertTimingAlmostEqual(get.elapsed, 0)
549
550 self.assertRaises(pyqueue.Empty, get, False, None)
551 self.assertTimingAlmostEqual(get.elapsed, 0)
552
553 self.assertRaises(pyqueue.Empty, get_nowait)
554 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
555
556 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
557 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
558
559 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
560 self.assertTimingAlmostEqual(get.elapsed, 0)
561
562 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
563 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
564
565 proc.join()
566
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000567 @classmethod
568 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000569 for i in range(10, 20):
570 queue.put(i)
571 # note that at this point the items may only be buffered, so the
572 # process cannot shutdown until the feeder thread has finished
573 # pushing items onto the pipe.
574
575 def test_fork(self):
576 # Old versions of Queue would fail to create a new feeder
577 # thread for a forked process if the original process had its
578 # own feeder thread. This test checks that this no longer
579 # happens.
580
581 queue = self.Queue()
582
583 # put items on queue so that main process starts a feeder thread
584 for i in range(10):
585 queue.put(i)
586
587 # wait to make sure thread starts before we fork a new process
588 time.sleep(DELTA)
589
590 # fork process
591 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200592 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000593 p.start()
594
595 # check that all expected items are in the queue
596 for i in range(20):
597 self.assertEqual(queue.get(), i)
598 self.assertRaises(pyqueue.Empty, queue.get, False)
599
600 p.join()
601
602 def test_qsize(self):
603 q = self.Queue()
604 try:
605 self.assertEqual(q.qsize(), 0)
606 except NotImplementedError:
607 return
608 q.put(1)
609 self.assertEqual(q.qsize(), 1)
610 q.put(5)
611 self.assertEqual(q.qsize(), 2)
612 q.get()
613 self.assertEqual(q.qsize(), 1)
614 q.get()
615 self.assertEqual(q.qsize(), 0)
616
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000617 @classmethod
618 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000619 for obj in iter(q.get, None):
620 time.sleep(DELTA)
621 q.task_done()
622
623 def test_task_done(self):
624 queue = self.JoinableQueue()
625
626 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000627 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000628
629 workers = [self.Process(target=self._test_task_done, args=(queue,))
630 for i in range(4)]
631
632 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200633 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000634 p.start()
635
636 for i in range(10):
637 queue.put(i)
638
639 queue.join()
640
641 for p in workers:
642 queue.put(None)
643
644 for p in workers:
645 p.join()
646
647#
648#
649#
650
651class _TestLock(BaseTestCase):
652
653 def test_lock(self):
654 lock = self.Lock()
655 self.assertEqual(lock.acquire(), True)
656 self.assertEqual(lock.acquire(False), False)
657 self.assertEqual(lock.release(), None)
658 self.assertRaises((ValueError, threading.ThreadError), lock.release)
659
660 def test_rlock(self):
661 lock = self.RLock()
662 self.assertEqual(lock.acquire(), True)
663 self.assertEqual(lock.acquire(), True)
664 self.assertEqual(lock.acquire(), True)
665 self.assertEqual(lock.release(), None)
666 self.assertEqual(lock.release(), None)
667 self.assertEqual(lock.release(), None)
668 self.assertRaises((AssertionError, RuntimeError), lock.release)
669
Jesse Nollerf8d00852009-03-31 03:25:07 +0000670 def test_lock_context(self):
671 with self.Lock():
672 pass
673
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674
675class _TestSemaphore(BaseTestCase):
676
677 def _test_semaphore(self, sem):
678 self.assertReturnsIfImplemented(2, get_value, sem)
679 self.assertEqual(sem.acquire(), True)
680 self.assertReturnsIfImplemented(1, get_value, sem)
681 self.assertEqual(sem.acquire(), True)
682 self.assertReturnsIfImplemented(0, get_value, sem)
683 self.assertEqual(sem.acquire(False), False)
684 self.assertReturnsIfImplemented(0, get_value, sem)
685 self.assertEqual(sem.release(), None)
686 self.assertReturnsIfImplemented(1, get_value, sem)
687 self.assertEqual(sem.release(), None)
688 self.assertReturnsIfImplemented(2, get_value, sem)
689
690 def test_semaphore(self):
691 sem = self.Semaphore(2)
692 self._test_semaphore(sem)
693 self.assertEqual(sem.release(), None)
694 self.assertReturnsIfImplemented(3, get_value, sem)
695 self.assertEqual(sem.release(), None)
696 self.assertReturnsIfImplemented(4, get_value, sem)
697
698 def test_bounded_semaphore(self):
699 sem = self.BoundedSemaphore(2)
700 self._test_semaphore(sem)
701 # Currently fails on OS/X
702 #if HAVE_GETVALUE:
703 # self.assertRaises(ValueError, sem.release)
704 # self.assertReturnsIfImplemented(2, get_value, sem)
705
706 def test_timeout(self):
707 if self.TYPE != 'processes':
708 return
709
710 sem = self.Semaphore(0)
711 acquire = TimingWrapper(sem.acquire)
712
713 self.assertEqual(acquire(False), False)
714 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
715
716 self.assertEqual(acquire(False, None), False)
717 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
718
719 self.assertEqual(acquire(False, TIMEOUT1), False)
720 self.assertTimingAlmostEqual(acquire.elapsed, 0)
721
722 self.assertEqual(acquire(True, TIMEOUT2), False)
723 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
724
725 self.assertEqual(acquire(timeout=TIMEOUT3), False)
726 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
727
728
729class _TestCondition(BaseTestCase):
730
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000731 @classmethod
732 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000733 cond.acquire()
734 sleeping.release()
735 cond.wait(timeout)
736 woken.release()
737 cond.release()
738
739 def check_invariant(self, cond):
740 # this is only supposed to succeed when there are no sleepers
741 if self.TYPE == 'processes':
742 try:
743 sleepers = (cond._sleeping_count.get_value() -
744 cond._woken_count.get_value())
745 self.assertEqual(sleepers, 0)
746 self.assertEqual(cond._wait_semaphore.get_value(), 0)
747 except NotImplementedError:
748 pass
749
750 def test_notify(self):
751 cond = self.Condition()
752 sleeping = self.Semaphore(0)
753 woken = self.Semaphore(0)
754
755 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000756 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000757 p.start()
758
759 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000760 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000761 p.start()
762
763 # wait for both children to start sleeping
764 sleeping.acquire()
765 sleeping.acquire()
766
767 # check no process/thread has woken up
768 time.sleep(DELTA)
769 self.assertReturnsIfImplemented(0, get_value, woken)
770
771 # wake up one process/thread
772 cond.acquire()
773 cond.notify()
774 cond.release()
775
776 # check one process/thread has woken up
777 time.sleep(DELTA)
778 self.assertReturnsIfImplemented(1, get_value, woken)
779
780 # wake up another
781 cond.acquire()
782 cond.notify()
783 cond.release()
784
785 # check other has woken up
786 time.sleep(DELTA)
787 self.assertReturnsIfImplemented(2, get_value, woken)
788
789 # check state is not mucked up
790 self.check_invariant(cond)
791 p.join()
792
793 def test_notify_all(self):
794 cond = self.Condition()
795 sleeping = self.Semaphore(0)
796 woken = self.Semaphore(0)
797
798 # start some threads/processes which will timeout
799 for i in range(3):
800 p = self.Process(target=self.f,
801 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000802 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 p.start()
804
805 t = threading.Thread(target=self.f,
806 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000807 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808 t.start()
809
810 # wait for them all to sleep
811 for i in range(6):
812 sleeping.acquire()
813
814 # check they have all timed out
815 for i in range(6):
816 woken.acquire()
817 self.assertReturnsIfImplemented(0, get_value, woken)
818
819 # check state is not mucked up
820 self.check_invariant(cond)
821
822 # start some more threads/processes
823 for i in range(3):
824 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000825 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 p.start()
827
828 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000829 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830 t.start()
831
832 # wait for them to all sleep
833 for i in range(6):
834 sleeping.acquire()
835
836 # check no process/thread has woken up
837 time.sleep(DELTA)
838 self.assertReturnsIfImplemented(0, get_value, woken)
839
840 # wake them all up
841 cond.acquire()
842 cond.notify_all()
843 cond.release()
844
845 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200846 for i in range(10):
847 try:
848 if get_value(woken) == 6:
849 break
850 except NotImplementedError:
851 break
852 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 self.assertReturnsIfImplemented(6, get_value, woken)
854
855 # check state is not mucked up
856 self.check_invariant(cond)
857
858 def test_timeout(self):
859 cond = self.Condition()
860 wait = TimingWrapper(cond.wait)
861 cond.acquire()
862 res = wait(TIMEOUT1)
863 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000864 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
866
867
868class _TestEvent(BaseTestCase):
869
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000870 @classmethod
871 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000872 time.sleep(TIMEOUT2)
873 event.set()
874
875 def test_event(self):
876 event = self.Event()
877 wait = TimingWrapper(event.wait)
878
Ezio Melotti13925002011-03-16 11:05:33 +0200879 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000881 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882
Benjamin Peterson965ce872009-04-05 21:24:58 +0000883 # Removed, threading.Event.wait() will return the value of the __flag
884 # instead of None. API Shear with the semaphore backed mp.Event
885 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000886 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000887 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
889
890 event.set()
891
892 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000893 self.assertEqual(event.is_set(), True)
894 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000896 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
898 # self.assertEqual(event.is_set(), True)
899
900 event.clear()
901
902 #self.assertEqual(event.is_set(), False)
903
Jesus Cea94f964f2011-09-09 20:26:57 +0200904 p = self.Process(target=self._test_event, args=(event,))
905 p.daemon = True
906 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000907 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908
909#
910#
911#
912
913class _TestValue(BaseTestCase):
914
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000915 ALLOWED_TYPES = ('processes',)
916
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 codes_values = [
918 ('i', 4343, 24234),
919 ('d', 3.625, -4.25),
920 ('h', -232, 234),
921 ('c', latin('x'), latin('y'))
922 ]
923
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000924 def setUp(self):
925 if not HAS_SHAREDCTYPES:
926 self.skipTest("requires multiprocessing.sharedctypes")
927
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000928 @classmethod
929 def _test(cls, values):
930 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931 sv.value = cv[2]
932
933
934 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000935 if raw:
936 values = [self.RawValue(code, value)
937 for code, value, _ in self.codes_values]
938 else:
939 values = [self.Value(code, value)
940 for code, value, _ in self.codes_values]
941
942 for sv, cv in zip(values, self.codes_values):
943 self.assertEqual(sv.value, cv[1])
944
945 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200946 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000947 proc.start()
948 proc.join()
949
950 for sv, cv in zip(values, self.codes_values):
951 self.assertEqual(sv.value, cv[2])
952
953 def test_rawvalue(self):
954 self.test_value(raw=True)
955
956 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000957 val1 = self.Value('i', 5)
958 lock1 = val1.get_lock()
959 obj1 = val1.get_obj()
960
961 val2 = self.Value('i', 5, lock=None)
962 lock2 = val2.get_lock()
963 obj2 = val2.get_obj()
964
965 lock = self.Lock()
966 val3 = self.Value('i', 5, lock=lock)
967 lock3 = val3.get_lock()
968 obj3 = val3.get_obj()
969 self.assertEqual(lock, lock3)
970
Jesse Nollerb0516a62009-01-18 03:11:38 +0000971 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000972 self.assertFalse(hasattr(arr4, 'get_lock'))
973 self.assertFalse(hasattr(arr4, 'get_obj'))
974
Jesse Nollerb0516a62009-01-18 03:11:38 +0000975 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
976
977 arr5 = self.RawValue('i', 5)
978 self.assertFalse(hasattr(arr5, 'get_lock'))
979 self.assertFalse(hasattr(arr5, 'get_obj'))
980
Benjamin Petersone711caf2008-06-11 16:44:04 +0000981
982class _TestArray(BaseTestCase):
983
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000984 ALLOWED_TYPES = ('processes',)
985
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000986 @classmethod
987 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988 for i in range(1, len(seq)):
989 seq[i] += seq[i-1]
990
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000991 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000992 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000993 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
994 if raw:
995 arr = self.RawArray('i', seq)
996 else:
997 arr = self.Array('i', seq)
998
999 self.assertEqual(len(arr), len(seq))
1000 self.assertEqual(arr[3], seq[3])
1001 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1002
1003 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1004
1005 self.assertEqual(list(arr[:]), seq)
1006
1007 self.f(seq)
1008
1009 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001010 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011 p.start()
1012 p.join()
1013
1014 self.assertEqual(list(arr[:]), seq)
1015
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001016 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001017 def test_array_from_size(self):
1018 size = 10
1019 # Test for zeroing (see issue #11675).
1020 # The repetition below strengthens the test by increasing the chances
1021 # of previously allocated non-zero memory being used for the new array
1022 # on the 2nd and 3rd loops.
1023 for _ in range(3):
1024 arr = self.Array('i', size)
1025 self.assertEqual(len(arr), size)
1026 self.assertEqual(list(arr), [0] * size)
1027 arr[:] = range(10)
1028 self.assertEqual(list(arr), list(range(10)))
1029 del arr
1030
1031 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032 def test_rawarray(self):
1033 self.test_array(raw=True)
1034
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001035 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001036 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037 arr1 = self.Array('i', list(range(10)))
1038 lock1 = arr1.get_lock()
1039 obj1 = arr1.get_obj()
1040
1041 arr2 = self.Array('i', list(range(10)), lock=None)
1042 lock2 = arr2.get_lock()
1043 obj2 = arr2.get_obj()
1044
1045 lock = self.Lock()
1046 arr3 = self.Array('i', list(range(10)), lock=lock)
1047 lock3 = arr3.get_lock()
1048 obj3 = arr3.get_obj()
1049 self.assertEqual(lock, lock3)
1050
Jesse Nollerb0516a62009-01-18 03:11:38 +00001051 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001052 self.assertFalse(hasattr(arr4, 'get_lock'))
1053 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001054 self.assertRaises(AttributeError,
1055 self.Array, 'i', range(10), lock='notalock')
1056
1057 arr5 = self.RawArray('i', range(10))
1058 self.assertFalse(hasattr(arr5, 'get_lock'))
1059 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060
1061#
1062#
1063#
1064
1065class _TestContainers(BaseTestCase):
1066
1067 ALLOWED_TYPES = ('manager',)
1068
1069 def test_list(self):
1070 a = self.list(list(range(10)))
1071 self.assertEqual(a[:], list(range(10)))
1072
1073 b = self.list()
1074 self.assertEqual(b[:], [])
1075
1076 b.extend(list(range(5)))
1077 self.assertEqual(b[:], list(range(5)))
1078
1079 self.assertEqual(b[2], 2)
1080 self.assertEqual(b[2:10], [2,3,4])
1081
1082 b *= 2
1083 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1084
1085 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1086
1087 self.assertEqual(a[:], list(range(10)))
1088
1089 d = [a, b]
1090 e = self.list(d)
1091 self.assertEqual(
1092 e[:],
1093 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1094 )
1095
1096 f = self.list([a])
1097 a.append('hello')
1098 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1099
1100 def test_dict(self):
1101 d = self.dict()
1102 indices = list(range(65, 70))
1103 for i in indices:
1104 d[i] = chr(i)
1105 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1106 self.assertEqual(sorted(d.keys()), indices)
1107 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1108 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1109
1110 def test_namespace(self):
1111 n = self.Namespace()
1112 n.name = 'Bob'
1113 n.job = 'Builder'
1114 n._hidden = 'hidden'
1115 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1116 del n.job
1117 self.assertEqual(str(n), "Namespace(name='Bob')")
1118 self.assertTrue(hasattr(n, 'name'))
1119 self.assertTrue(not hasattr(n, 'job'))
1120
1121#
1122#
1123#
1124
1125def sqr(x, wait=0.0):
1126 time.sleep(wait)
1127 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001128
Antoine Pitroude911b22011-12-21 11:03:24 +01001129def mul(x, y):
1130 return x*y
1131
Benjamin Petersone711caf2008-06-11 16:44:04 +00001132class _TestPool(BaseTestCase):
1133
1134 def test_apply(self):
1135 papply = self.pool.apply
1136 self.assertEqual(papply(sqr, (5,)), sqr(5))
1137 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1138
1139 def test_map(self):
1140 pmap = self.pool.map
1141 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1142 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1143 list(map(sqr, list(range(100)))))
1144
Antoine Pitroude911b22011-12-21 11:03:24 +01001145 def test_starmap(self):
1146 psmap = self.pool.starmap
1147 tuples = list(zip(range(10), range(9,-1, -1)))
1148 self.assertEqual(psmap(mul, tuples),
1149 list(itertools.starmap(mul, tuples)))
1150 tuples = list(zip(range(100), range(99,-1, -1)))
1151 self.assertEqual(psmap(mul, tuples, chunksize=20),
1152 list(itertools.starmap(mul, tuples)))
1153
1154 def test_starmap_async(self):
1155 tuples = list(zip(range(100), range(99,-1, -1)))
1156 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1157 list(itertools.starmap(mul, tuples)))
1158
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001159 def test_map_chunksize(self):
1160 try:
1161 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1162 except multiprocessing.TimeoutError:
1163 self.fail("pool.map_async with chunksize stalled on null list")
1164
Benjamin Petersone711caf2008-06-11 16:44:04 +00001165 def test_async(self):
1166 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1167 get = TimingWrapper(res.get)
1168 self.assertEqual(get(), 49)
1169 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1170
1171 def test_async_timeout(self):
1172 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1173 get = TimingWrapper(res.get)
1174 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1175 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1176
1177 def test_imap(self):
1178 it = self.pool.imap(sqr, list(range(10)))
1179 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1180
1181 it = self.pool.imap(sqr, list(range(10)))
1182 for i in range(10):
1183 self.assertEqual(next(it), i*i)
1184 self.assertRaises(StopIteration, it.__next__)
1185
1186 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1187 for i in range(1000):
1188 self.assertEqual(next(it), i*i)
1189 self.assertRaises(StopIteration, it.__next__)
1190
1191 def test_imap_unordered(self):
1192 it = self.pool.imap_unordered(sqr, list(range(1000)))
1193 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1194
1195 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1196 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1197
1198 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001199 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1200 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1201
Benjamin Petersone711caf2008-06-11 16:44:04 +00001202 p = multiprocessing.Pool(3)
1203 self.assertEqual(3, len(p._pool))
1204 p.close()
1205 p.join()
1206
1207 def test_terminate(self):
1208 if self.TYPE == 'manager':
1209 # On Unix a forked process increfs each shared object to
1210 # which its parent process held a reference. If the
1211 # forked process gets terminated then there is likely to
1212 # be a reference leak. So to prevent
1213 # _TestZZZNumberOfObjects from failing we skip this test
1214 # when using a manager.
1215 return
1216
1217 result = self.pool.map_async(
1218 time.sleep, [0.1 for i in range(10000)], chunksize=1
1219 )
1220 self.pool.terminate()
1221 join = TimingWrapper(self.pool.join)
1222 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001223 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001224
Ask Solem2afcbf22010-11-09 20:55:52 +00001225def raising():
1226 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001227
Ask Solem2afcbf22010-11-09 20:55:52 +00001228def unpickleable_result():
1229 return lambda: 42
1230
1231class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001232 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001233
1234 def test_async_error_callback(self):
1235 p = multiprocessing.Pool(2)
1236
1237 scratchpad = [None]
1238 def errback(exc):
1239 scratchpad[0] = exc
1240
1241 res = p.apply_async(raising, error_callback=errback)
1242 self.assertRaises(KeyError, res.get)
1243 self.assertTrue(scratchpad[0])
1244 self.assertIsInstance(scratchpad[0], KeyError)
1245
1246 p.close()
1247 p.join()
1248
1249 def test_unpickleable_result(self):
1250 from multiprocessing.pool import MaybeEncodingError
1251 p = multiprocessing.Pool(2)
1252
1253 # Make sure we don't lose pool processes because of encoding errors.
1254 for iteration in range(20):
1255
1256 scratchpad = [None]
1257 def errback(exc):
1258 scratchpad[0] = exc
1259
1260 res = p.apply_async(unpickleable_result, error_callback=errback)
1261 self.assertRaises(MaybeEncodingError, res.get)
1262 wrapped = scratchpad[0]
1263 self.assertTrue(wrapped)
1264 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1265 self.assertIsNotNone(wrapped.exc)
1266 self.assertIsNotNone(wrapped.value)
1267
1268 p.close()
1269 p.join()
1270
1271class _TestPoolWorkerLifetime(BaseTestCase):
1272 ALLOWED_TYPES = ('processes', )
1273
Jesse Noller1f0b6582010-01-27 03:36:01 +00001274 def test_pool_worker_lifetime(self):
1275 p = multiprocessing.Pool(3, maxtasksperchild=10)
1276 self.assertEqual(3, len(p._pool))
1277 origworkerpids = [w.pid for w in p._pool]
1278 # Run many tasks so each worker gets replaced (hopefully)
1279 results = []
1280 for i in range(100):
1281 results.append(p.apply_async(sqr, (i, )))
1282 # Fetch the results and verify we got the right answers,
1283 # also ensuring all the tasks have completed.
1284 for (j, res) in enumerate(results):
1285 self.assertEqual(res.get(), sqr(j))
1286 # Refill the pool
1287 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001288 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001289 # (countdown * DELTA = 5 seconds max startup process time)
1290 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001291 while countdown and not all(w.is_alive() for w in p._pool):
1292 countdown -= 1
1293 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001294 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001295 # All pids should be assigned. See issue #7805.
1296 self.assertNotIn(None, origworkerpids)
1297 self.assertNotIn(None, finalworkerpids)
1298 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001299 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1300 p.close()
1301 p.join()
1302
Charles-François Natalif8859e12011-10-24 18:45:29 +02001303 def test_pool_worker_lifetime_early_close(self):
1304 # Issue #10332: closing a pool whose workers have limited lifetimes
1305 # before all the tasks completed would make join() hang.
1306 p = multiprocessing.Pool(3, maxtasksperchild=1)
1307 results = []
1308 for i in range(6):
1309 results.append(p.apply_async(sqr, (i, 0.3)))
1310 p.close()
1311 p.join()
1312 # check the results
1313 for (j, res) in enumerate(results):
1314 self.assertEqual(res.get(), sqr(j))
1315
1316
Benjamin Petersone711caf2008-06-11 16:44:04 +00001317#
1318# Test that manager has expected number of shared objects left
1319#
1320
1321class _TestZZZNumberOfObjects(BaseTestCase):
1322 # Because test cases are sorted alphabetically, this one will get
1323 # run after all the other tests for the manager. It tests that
1324 # there have been no "reference leaks" for the manager's shared
1325 # objects. Note the comment in _TestPool.test_terminate().
1326 ALLOWED_TYPES = ('manager',)
1327
1328 def test_number_of_objects(self):
1329 EXPECTED_NUMBER = 1 # the pool object is still alive
1330 multiprocessing.active_children() # discard dead process objs
1331 gc.collect() # do garbage collection
1332 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001333 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001334 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001335 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001336 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001337
1338 self.assertEqual(refs, EXPECTED_NUMBER)
1339
1340#
1341# Test of creating a customized manager class
1342#
1343
1344from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1345
1346class FooBar(object):
1347 def f(self):
1348 return 'f()'
1349 def g(self):
1350 raise ValueError
1351 def _h(self):
1352 return '_h()'
1353
1354def baz():
1355 for i in range(10):
1356 yield i*i
1357
1358class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001359 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001360 def __iter__(self):
1361 return self
1362 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001363 return self._callmethod('__next__')
1364
1365class MyManager(BaseManager):
1366 pass
1367
1368MyManager.register('Foo', callable=FooBar)
1369MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1370MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1371
1372
1373class _TestMyManager(BaseTestCase):
1374
1375 ALLOWED_TYPES = ('manager',)
1376
1377 def test_mymanager(self):
1378 manager = MyManager()
1379 manager.start()
1380
1381 foo = manager.Foo()
1382 bar = manager.Bar()
1383 baz = manager.baz()
1384
1385 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1386 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1387
1388 self.assertEqual(foo_methods, ['f', 'g'])
1389 self.assertEqual(bar_methods, ['f', '_h'])
1390
1391 self.assertEqual(foo.f(), 'f()')
1392 self.assertRaises(ValueError, foo.g)
1393 self.assertEqual(foo._callmethod('f'), 'f()')
1394 self.assertRaises(RemoteError, foo._callmethod, '_h')
1395
1396 self.assertEqual(bar.f(), 'f()')
1397 self.assertEqual(bar._h(), '_h()')
1398 self.assertEqual(bar._callmethod('f'), 'f()')
1399 self.assertEqual(bar._callmethod('_h'), '_h()')
1400
1401 self.assertEqual(list(baz), [i*i for i in range(10)])
1402
1403 manager.shutdown()
1404
1405#
1406# Test of connecting to a remote server and using xmlrpclib for serialization
1407#
1408
1409_queue = pyqueue.Queue()
1410def get_queue():
1411 return _queue
1412
1413class QueueManager(BaseManager):
1414 '''manager class used by server process'''
1415QueueManager.register('get_queue', callable=get_queue)
1416
1417class QueueManager2(BaseManager):
1418 '''manager class which specifies the same interface as QueueManager'''
1419QueueManager2.register('get_queue')
1420
1421
1422SERIALIZER = 'xmlrpclib'
1423
1424class _TestRemoteManager(BaseTestCase):
1425
1426 ALLOWED_TYPES = ('manager',)
1427
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001428 @classmethod
1429 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001430 manager = QueueManager2(
1431 address=address, authkey=authkey, serializer=SERIALIZER
1432 )
1433 manager.connect()
1434 queue = manager.get_queue()
1435 queue.put(('hello world', None, True, 2.25))
1436
1437 def test_remote(self):
1438 authkey = os.urandom(32)
1439
1440 manager = QueueManager(
1441 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1442 )
1443 manager.start()
1444
1445 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001446 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001447 p.start()
1448
1449 manager2 = QueueManager2(
1450 address=manager.address, authkey=authkey, serializer=SERIALIZER
1451 )
1452 manager2.connect()
1453 queue = manager2.get_queue()
1454
1455 # Note that xmlrpclib will deserialize object as a list not a tuple
1456 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1457
1458 # Because we are using xmlrpclib for serialization instead of
1459 # pickle this will cause a serialization error.
1460 self.assertRaises(Exception, queue.put, time.sleep)
1461
1462 # Make queue finalizer run before the server is stopped
1463 del queue
1464 manager.shutdown()
1465
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001466class _TestManagerRestart(BaseTestCase):
1467
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001468 @classmethod
1469 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001470 manager = QueueManager(
1471 address=address, authkey=authkey, serializer=SERIALIZER)
1472 manager.connect()
1473 queue = manager.get_queue()
1474 queue.put('hello world')
1475
1476 def test_rapid_restart(self):
1477 authkey = os.urandom(32)
1478 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001479 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001480 srvr = manager.get_server()
1481 addr = srvr.address
1482 # Close the connection.Listener socket which gets opened as a part
1483 # of manager.get_server(). It's not needed for the test.
1484 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001485 manager.start()
1486
1487 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001488 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001489 p.start()
1490 queue = manager.get_queue()
1491 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001492 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001493 manager.shutdown()
1494 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001495 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001496 try:
1497 manager.start()
1498 except IOError as e:
1499 if e.errno != errno.EADDRINUSE:
1500 raise
1501 # Retry after some time, in case the old socket was lingering
1502 # (sporadic failure on buildbots)
1503 time.sleep(1.0)
1504 manager = QueueManager(
1505 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001506 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001507
Benjamin Petersone711caf2008-06-11 16:44:04 +00001508#
1509#
1510#
1511
1512SENTINEL = latin('')
1513
1514class _TestConnection(BaseTestCase):
1515
1516 ALLOWED_TYPES = ('processes', 'threads')
1517
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001518 @classmethod
1519 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001520 for msg in iter(conn.recv_bytes, SENTINEL):
1521 conn.send_bytes(msg)
1522 conn.close()
1523
1524 def test_connection(self):
1525 conn, child_conn = self.Pipe()
1526
1527 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001528 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001529 p.start()
1530
1531 seq = [1, 2.25, None]
1532 msg = latin('hello world')
1533 longmsg = msg * 10
1534 arr = array.array('i', list(range(4)))
1535
1536 if self.TYPE == 'processes':
1537 self.assertEqual(type(conn.fileno()), int)
1538
1539 self.assertEqual(conn.send(seq), None)
1540 self.assertEqual(conn.recv(), seq)
1541
1542 self.assertEqual(conn.send_bytes(msg), None)
1543 self.assertEqual(conn.recv_bytes(), msg)
1544
1545 if self.TYPE == 'processes':
1546 buffer = array.array('i', [0]*10)
1547 expected = list(arr) + [0] * (10 - len(arr))
1548 self.assertEqual(conn.send_bytes(arr), None)
1549 self.assertEqual(conn.recv_bytes_into(buffer),
1550 len(arr) * buffer.itemsize)
1551 self.assertEqual(list(buffer), expected)
1552
1553 buffer = array.array('i', [0]*10)
1554 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1555 self.assertEqual(conn.send_bytes(arr), None)
1556 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1557 len(arr) * buffer.itemsize)
1558 self.assertEqual(list(buffer), expected)
1559
1560 buffer = bytearray(latin(' ' * 40))
1561 self.assertEqual(conn.send_bytes(longmsg), None)
1562 try:
1563 res = conn.recv_bytes_into(buffer)
1564 except multiprocessing.BufferTooShort as e:
1565 self.assertEqual(e.args, (longmsg,))
1566 else:
1567 self.fail('expected BufferTooShort, got %s' % res)
1568
1569 poll = TimingWrapper(conn.poll)
1570
1571 self.assertEqual(poll(), False)
1572 self.assertTimingAlmostEqual(poll.elapsed, 0)
1573
1574 self.assertEqual(poll(TIMEOUT1), False)
1575 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1576
1577 conn.send(None)
1578
1579 self.assertEqual(poll(TIMEOUT1), True)
1580 self.assertTimingAlmostEqual(poll.elapsed, 0)
1581
1582 self.assertEqual(conn.recv(), None)
1583
1584 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1585 conn.send_bytes(really_big_msg)
1586 self.assertEqual(conn.recv_bytes(), really_big_msg)
1587
1588 conn.send_bytes(SENTINEL) # tell child to quit
1589 child_conn.close()
1590
1591 if self.TYPE == 'processes':
1592 self.assertEqual(conn.readable, True)
1593 self.assertEqual(conn.writable, True)
1594 self.assertRaises(EOFError, conn.recv)
1595 self.assertRaises(EOFError, conn.recv_bytes)
1596
1597 p.join()
1598
1599 def test_duplex_false(self):
1600 reader, writer = self.Pipe(duplex=False)
1601 self.assertEqual(writer.send(1), None)
1602 self.assertEqual(reader.recv(), 1)
1603 if self.TYPE == 'processes':
1604 self.assertEqual(reader.readable, True)
1605 self.assertEqual(reader.writable, False)
1606 self.assertEqual(writer.readable, False)
1607 self.assertEqual(writer.writable, True)
1608 self.assertRaises(IOError, reader.send, 2)
1609 self.assertRaises(IOError, writer.recv)
1610 self.assertRaises(IOError, writer.poll)
1611
1612 def test_spawn_close(self):
1613 # We test that a pipe connection can be closed by parent
1614 # process immediately after child is spawned. On Windows this
1615 # would have sometimes failed on old versions because
1616 # child_conn would be closed before the child got a chance to
1617 # duplicate it.
1618 conn, child_conn = self.Pipe()
1619
1620 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001621 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001622 p.start()
1623 child_conn.close() # this might complete before child initializes
1624
1625 msg = latin('hello')
1626 conn.send_bytes(msg)
1627 self.assertEqual(conn.recv_bytes(), msg)
1628
1629 conn.send_bytes(SENTINEL)
1630 conn.close()
1631 p.join()
1632
1633 def test_sendbytes(self):
1634 if self.TYPE != 'processes':
1635 return
1636
1637 msg = latin('abcdefghijklmnopqrstuvwxyz')
1638 a, b = self.Pipe()
1639
1640 a.send_bytes(msg)
1641 self.assertEqual(b.recv_bytes(), msg)
1642
1643 a.send_bytes(msg, 5)
1644 self.assertEqual(b.recv_bytes(), msg[5:])
1645
1646 a.send_bytes(msg, 7, 8)
1647 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1648
1649 a.send_bytes(msg, 26)
1650 self.assertEqual(b.recv_bytes(), latin(''))
1651
1652 a.send_bytes(msg, 26, 0)
1653 self.assertEqual(b.recv_bytes(), latin(''))
1654
1655 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1656
1657 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1658
1659 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1660
1661 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1662
1663 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1664
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001665 @classmethod
1666 def _is_fd_assigned(cls, fd):
1667 try:
1668 os.fstat(fd)
1669 except OSError as e:
1670 if e.errno == errno.EBADF:
1671 return False
1672 raise
1673 else:
1674 return True
1675
1676 @classmethod
1677 def _writefd(cls, conn, data, create_dummy_fds=False):
1678 if create_dummy_fds:
1679 for i in range(0, 256):
1680 if not cls._is_fd_assigned(i):
1681 os.dup2(conn.fileno(), i)
1682 fd = reduction.recv_handle(conn)
1683 if msvcrt:
1684 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1685 os.write(fd, data)
1686 os.close(fd)
1687
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001688 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001689 def test_fd_transfer(self):
1690 if self.TYPE != 'processes':
1691 self.skipTest("only makes sense with processes")
1692 conn, child_conn = self.Pipe(duplex=True)
1693
1694 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001695 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001696 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001697 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001698 with open(test.support.TESTFN, "wb") as f:
1699 fd = f.fileno()
1700 if msvcrt:
1701 fd = msvcrt.get_osfhandle(fd)
1702 reduction.send_handle(conn, fd, p.pid)
1703 p.join()
1704 with open(test.support.TESTFN, "rb") as f:
1705 self.assertEqual(f.read(), b"foo")
1706
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001707 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001708 @unittest.skipIf(sys.platform == "win32",
1709 "test semantics don't make sense on Windows")
1710 @unittest.skipIf(MAXFD <= 256,
1711 "largest assignable fd number is too small")
1712 @unittest.skipUnless(hasattr(os, "dup2"),
1713 "test needs os.dup2()")
1714 def test_large_fd_transfer(self):
1715 # With fd > 256 (issue #11657)
1716 if self.TYPE != 'processes':
1717 self.skipTest("only makes sense with processes")
1718 conn, child_conn = self.Pipe(duplex=True)
1719
1720 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001721 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001722 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001723 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001724 with open(test.support.TESTFN, "wb") as f:
1725 fd = f.fileno()
1726 for newfd in range(256, MAXFD):
1727 if not self._is_fd_assigned(newfd):
1728 break
1729 else:
1730 self.fail("could not find an unassigned large file descriptor")
1731 os.dup2(fd, newfd)
1732 try:
1733 reduction.send_handle(conn, newfd, p.pid)
1734 finally:
1735 os.close(newfd)
1736 p.join()
1737 with open(test.support.TESTFN, "rb") as f:
1738 self.assertEqual(f.read(), b"bar")
1739
Jesus Cea4507e642011-09-21 03:53:25 +02001740 @classmethod
1741 def _send_data_without_fd(self, conn):
1742 os.write(conn.fileno(), b"\0")
1743
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001744 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001745 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1746 def test_missing_fd_transfer(self):
1747 # Check that exception is raised when received data is not
1748 # accompanied by a file descriptor in ancillary data.
1749 if self.TYPE != 'processes':
1750 self.skipTest("only makes sense with processes")
1751 conn, child_conn = self.Pipe(duplex=True)
1752
1753 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1754 p.daemon = True
1755 p.start()
1756 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1757 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001758
Benjamin Petersone711caf2008-06-11 16:44:04 +00001759class _TestListenerClient(BaseTestCase):
1760
1761 ALLOWED_TYPES = ('processes', 'threads')
1762
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001763 @classmethod
1764 def _test(cls, address):
1765 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001766 conn.send('hello')
1767 conn.close()
1768
1769 def test_listener_client(self):
1770 for family in self.connection.families:
1771 l = self.connection.Listener(family=family)
1772 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001773 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001774 p.start()
1775 conn = l.accept()
1776 self.assertEqual(conn.recv(), 'hello')
1777 p.join()
1778 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001779#
1780# Test of sending connection and socket objects between processes
1781#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001782"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001783class _TestPicklingConnections(BaseTestCase):
1784
1785 ALLOWED_TYPES = ('processes',)
1786
1787 def _listener(self, conn, families):
1788 for fam in families:
1789 l = self.connection.Listener(family=fam)
1790 conn.send(l.address)
1791 new_conn = l.accept()
1792 conn.send(new_conn)
1793
1794 if self.TYPE == 'processes':
1795 l = socket.socket()
1796 l.bind(('localhost', 0))
1797 conn.send(l.getsockname())
1798 l.listen(1)
1799 new_conn, addr = l.accept()
1800 conn.send(new_conn)
1801
1802 conn.recv()
1803
1804 def _remote(self, conn):
1805 for (address, msg) in iter(conn.recv, None):
1806 client = self.connection.Client(address)
1807 client.send(msg.upper())
1808 client.close()
1809
1810 if self.TYPE == 'processes':
1811 address, msg = conn.recv()
1812 client = socket.socket()
1813 client.connect(address)
1814 client.sendall(msg.upper())
1815 client.close()
1816
1817 conn.close()
1818
1819 def test_pickling(self):
1820 try:
1821 multiprocessing.allow_connection_pickling()
1822 except ImportError:
1823 return
1824
1825 families = self.connection.families
1826
1827 lconn, lconn0 = self.Pipe()
1828 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001829 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001830 lp.start()
1831 lconn0.close()
1832
1833 rconn, rconn0 = self.Pipe()
1834 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001835 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001836 rp.start()
1837 rconn0.close()
1838
1839 for fam in families:
1840 msg = ('This connection uses family %s' % fam).encode('ascii')
1841 address = lconn.recv()
1842 rconn.send((address, msg))
1843 new_conn = lconn.recv()
1844 self.assertEqual(new_conn.recv(), msg.upper())
1845
1846 rconn.send(None)
1847
1848 if self.TYPE == 'processes':
1849 msg = latin('This connection uses a normal socket')
1850 address = lconn.recv()
1851 rconn.send((address, msg))
1852 if hasattr(socket, 'fromfd'):
1853 new_conn = lconn.recv()
1854 self.assertEqual(new_conn.recv(100), msg.upper())
1855 else:
1856 # XXX On Windows with Py2.6 need to backport fromfd()
1857 discard = lconn.recv_bytes()
1858
1859 lconn.send(None)
1860
1861 rconn.close()
1862 lconn.close()
1863
1864 lp.join()
1865 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001866"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001867#
1868#
1869#
1870
1871class _TestHeap(BaseTestCase):
1872
1873 ALLOWED_TYPES = ('processes',)
1874
1875 def test_heap(self):
1876 iterations = 5000
1877 maxblocks = 50
1878 blocks = []
1879
1880 # create and destroy lots of blocks of different sizes
1881 for i in range(iterations):
1882 size = int(random.lognormvariate(0, 1) * 1000)
1883 b = multiprocessing.heap.BufferWrapper(size)
1884 blocks.append(b)
1885 if len(blocks) > maxblocks:
1886 i = random.randrange(maxblocks)
1887 del blocks[i]
1888
1889 # get the heap object
1890 heap = multiprocessing.heap.BufferWrapper._heap
1891
1892 # verify the state of the heap
1893 all = []
1894 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001895 heap._lock.acquire()
1896 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001897 for L in list(heap._len_to_seq.values()):
1898 for arena, start, stop in L:
1899 all.append((heap._arenas.index(arena), start, stop,
1900 stop-start, 'free'))
1901 for arena, start, stop in heap._allocated_blocks:
1902 all.append((heap._arenas.index(arena), start, stop,
1903 stop-start, 'occupied'))
1904 occupied += (stop-start)
1905
1906 all.sort()
1907
1908 for i in range(len(all)-1):
1909 (arena, start, stop) = all[i][:3]
1910 (narena, nstart, nstop) = all[i+1][:3]
1911 self.assertTrue((arena != narena and nstart == 0) or
1912 (stop == nstart))
1913
Charles-François Natali778db492011-07-02 14:35:49 +02001914 def test_free_from_gc(self):
1915 # Check that freeing of blocks by the garbage collector doesn't deadlock
1916 # (issue #12352).
1917 # Make sure the GC is enabled, and set lower collection thresholds to
1918 # make collections more frequent (and increase the probability of
1919 # deadlock).
1920 if not gc.isenabled():
1921 gc.enable()
1922 self.addCleanup(gc.disable)
1923 thresholds = gc.get_threshold()
1924 self.addCleanup(gc.set_threshold, *thresholds)
1925 gc.set_threshold(10)
1926
1927 # perform numerous block allocations, with cyclic references to make
1928 # sure objects are collected asynchronously by the gc
1929 for i in range(5000):
1930 a = multiprocessing.heap.BufferWrapper(1)
1931 b = multiprocessing.heap.BufferWrapper(1)
1932 # circular references
1933 a.buddy = b
1934 b.buddy = a
1935
Benjamin Petersone711caf2008-06-11 16:44:04 +00001936#
1937#
1938#
1939
Benjamin Petersone711caf2008-06-11 16:44:04 +00001940class _Foo(Structure):
1941 _fields_ = [
1942 ('x', c_int),
1943 ('y', c_double)
1944 ]
1945
1946class _TestSharedCTypes(BaseTestCase):
1947
1948 ALLOWED_TYPES = ('processes',)
1949
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001950 def setUp(self):
1951 if not HAS_SHAREDCTYPES:
1952 self.skipTest("requires multiprocessing.sharedctypes")
1953
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001954 @classmethod
1955 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001956 x.value *= 2
1957 y.value *= 2
1958 foo.x *= 2
1959 foo.y *= 2
1960 string.value *= 2
1961 for i in range(len(arr)):
1962 arr[i] *= 2
1963
1964 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001965 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001966 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001967 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001968 arr = self.Array('d', list(range(10)), lock=lock)
1969 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001970 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001971
1972 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001973 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001974 p.start()
1975 p.join()
1976
1977 self.assertEqual(x.value, 14)
1978 self.assertAlmostEqual(y.value, 2.0/3.0)
1979 self.assertEqual(foo.x, 6)
1980 self.assertAlmostEqual(foo.y, 4.0)
1981 for i in range(10):
1982 self.assertAlmostEqual(arr[i], i*2)
1983 self.assertEqual(string.value, latin('hellohello'))
1984
1985 def test_synchronize(self):
1986 self.test_sharedctypes(lock=True)
1987
1988 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001989 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001990 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991 foo.x = 0
1992 foo.y = 0
1993 self.assertEqual(bar.x, 2)
1994 self.assertAlmostEqual(bar.y, 5.0)
1995
1996#
1997#
1998#
1999
2000class _TestFinalize(BaseTestCase):
2001
2002 ALLOWED_TYPES = ('processes',)
2003
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002004 @classmethod
2005 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002006 class Foo(object):
2007 pass
2008
2009 a = Foo()
2010 util.Finalize(a, conn.send, args=('a',))
2011 del a # triggers callback for a
2012
2013 b = Foo()
2014 close_b = util.Finalize(b, conn.send, args=('b',))
2015 close_b() # triggers callback for b
2016 close_b() # does nothing because callback has already been called
2017 del b # does nothing because callback has already been called
2018
2019 c = Foo()
2020 util.Finalize(c, conn.send, args=('c',))
2021
2022 d10 = Foo()
2023 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2024
2025 d01 = Foo()
2026 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2027 d02 = Foo()
2028 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2029 d03 = Foo()
2030 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2031
2032 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2033
2034 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2035
Ezio Melotti13925002011-03-16 11:05:33 +02002036 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002037 # garbage collecting locals
2038 util._exit_function()
2039 conn.close()
2040 os._exit(0)
2041
2042 def test_finalize(self):
2043 conn, child_conn = self.Pipe()
2044
2045 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002046 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002047 p.start()
2048 p.join()
2049
2050 result = [obj for obj in iter(conn.recv, 'STOP')]
2051 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2052
2053#
2054# Test that from ... import * works for each module
2055#
2056
2057class _TestImportStar(BaseTestCase):
2058
2059 ALLOWED_TYPES = ('processes',)
2060
2061 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002062 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002063 'multiprocessing', 'multiprocessing.connection',
2064 'multiprocessing.heap', 'multiprocessing.managers',
2065 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002066 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002067 ]
2068
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002069 if HAS_REDUCTION:
2070 modules.append('multiprocessing.reduction')
2071
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002072 if c_int is not None:
2073 # This module requires _ctypes
2074 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002075
2076 for name in modules:
2077 __import__(name)
2078 mod = sys.modules[name]
2079
2080 for attr in getattr(mod, '__all__', ()):
2081 self.assertTrue(
2082 hasattr(mod, attr),
2083 '%r does not have attribute %r' % (mod, attr)
2084 )
2085
2086#
2087# Quick test that logging works -- does not test logging output
2088#
2089
2090class _TestLogging(BaseTestCase):
2091
2092 ALLOWED_TYPES = ('processes',)
2093
2094 def test_enable_logging(self):
2095 logger = multiprocessing.get_logger()
2096 logger.setLevel(util.SUBWARNING)
2097 self.assertTrue(logger is not None)
2098 logger.debug('this will not be printed')
2099 logger.info('nor will this')
2100 logger.setLevel(LOG_LEVEL)
2101
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002102 @classmethod
2103 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002104 logger = multiprocessing.get_logger()
2105 conn.send(logger.getEffectiveLevel())
2106
2107 def test_level(self):
2108 LEVEL1 = 32
2109 LEVEL2 = 37
2110
2111 logger = multiprocessing.get_logger()
2112 root_logger = logging.getLogger()
2113 root_level = root_logger.level
2114
2115 reader, writer = multiprocessing.Pipe(duplex=False)
2116
2117 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002118 p = self.Process(target=self._test_level, args=(writer,))
2119 p.daemon = True
2120 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002121 self.assertEqual(LEVEL1, reader.recv())
2122
2123 logger.setLevel(logging.NOTSET)
2124 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002125 p = self.Process(target=self._test_level, args=(writer,))
2126 p.daemon = True
2127 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002128 self.assertEqual(LEVEL2, reader.recv())
2129
2130 root_logger.setLevel(root_level)
2131 logger.setLevel(level=LOG_LEVEL)
2132
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002133
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002134# class _TestLoggingProcessName(BaseTestCase):
2135#
2136# def handle(self, record):
2137# assert record.processName == multiprocessing.current_process().name
2138# self.__handled = True
2139#
2140# def test_logging(self):
2141# handler = logging.Handler()
2142# handler.handle = self.handle
2143# self.__handled = False
2144# # Bypass getLogger() and side-effects
2145# logger = logging.getLoggerClass()(
2146# 'multiprocessing.test.TestLoggingProcessName')
2147# logger.addHandler(handler)
2148# logger.propagate = False
2149#
2150# logger.warn('foo')
2151# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002152
Benjamin Petersone711caf2008-06-11 16:44:04 +00002153#
Jesse Noller6214edd2009-01-19 16:23:53 +00002154# Test to verify handle verification, see issue 3321
2155#
2156
2157class TestInvalidHandle(unittest.TestCase):
2158
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002159 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002160 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002161 conn = multiprocessing.connection.Connection(44977608)
2162 try:
2163 self.assertRaises((ValueError, IOError), conn.poll)
2164 finally:
2165 # Hack private attribute _handle to avoid printing an error
2166 # in conn.__del__
2167 conn._handle = None
2168 self.assertRaises((ValueError, IOError),
2169 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002170
Jesse Noller6214edd2009-01-19 16:23:53 +00002171#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002172# Functions used to create test cases from the base ones in this module
2173#
2174
2175def get_attributes(Source, names):
2176 d = {}
2177 for name in names:
2178 obj = getattr(Source, name)
2179 if type(obj) == type(get_attributes):
2180 obj = staticmethod(obj)
2181 d[name] = obj
2182 return d
2183
2184def create_test_cases(Mixin, type):
2185 result = {}
2186 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002187 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002188
2189 for name in list(glob.keys()):
2190 if name.startswith('_Test'):
2191 base = glob[name]
2192 if type in base.ALLOWED_TYPES:
2193 newname = 'With' + Type + name[1:]
2194 class Temp(base, unittest.TestCase, Mixin):
2195 pass
2196 result[newname] = Temp
2197 Temp.__name__ = newname
2198 Temp.__module__ = Mixin.__module__
2199 return result
2200
2201#
2202# Create test cases
2203#
2204
2205class ProcessesMixin(object):
2206 TYPE = 'processes'
2207 Process = multiprocessing.Process
2208 locals().update(get_attributes(multiprocessing, (
2209 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2210 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2211 'RawArray', 'current_process', 'active_children', 'Pipe',
2212 'connection', 'JoinableQueue'
2213 )))
2214
2215testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2216globals().update(testcases_processes)
2217
2218
2219class ManagerMixin(object):
2220 TYPE = 'manager'
2221 Process = multiprocessing.Process
2222 manager = object.__new__(multiprocessing.managers.SyncManager)
2223 locals().update(get_attributes(manager, (
2224 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2225 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2226 'Namespace', 'JoinableQueue'
2227 )))
2228
2229testcases_manager = create_test_cases(ManagerMixin, type='manager')
2230globals().update(testcases_manager)
2231
2232
2233class ThreadsMixin(object):
2234 TYPE = 'threads'
2235 Process = multiprocessing.dummy.Process
2236 locals().update(get_attributes(multiprocessing.dummy, (
2237 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2238 'Condition', 'Event', 'Value', 'Array', 'current_process',
2239 'active_children', 'Pipe', 'connection', 'dict', 'list',
2240 'Namespace', 'JoinableQueue'
2241 )))
2242
2243testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2244globals().update(testcases_threads)
2245
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002246class OtherTest(unittest.TestCase):
2247 # TODO: add more tests for deliver/answer challenge.
2248 def test_deliver_challenge_auth_failure(self):
2249 class _FakeConnection(object):
2250 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002251 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002252 def send_bytes(self, data):
2253 pass
2254 self.assertRaises(multiprocessing.AuthenticationError,
2255 multiprocessing.connection.deliver_challenge,
2256 _FakeConnection(), b'abc')
2257
2258 def test_answer_challenge_auth_failure(self):
2259 class _FakeConnection(object):
2260 def __init__(self):
2261 self.count = 0
2262 def recv_bytes(self, size):
2263 self.count += 1
2264 if self.count == 1:
2265 return multiprocessing.connection.CHALLENGE
2266 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002267 return b'something bogus'
2268 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002269 def send_bytes(self, data):
2270 pass
2271 self.assertRaises(multiprocessing.AuthenticationError,
2272 multiprocessing.connection.answer_challenge,
2273 _FakeConnection(), b'abc')
2274
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002275#
2276# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2277#
2278
2279def initializer(ns):
2280 ns.test += 1
2281
2282class TestInitializers(unittest.TestCase):
2283 def setUp(self):
2284 self.mgr = multiprocessing.Manager()
2285 self.ns = self.mgr.Namespace()
2286 self.ns.test = 0
2287
2288 def tearDown(self):
2289 self.mgr.shutdown()
2290
2291 def test_manager_initializer(self):
2292 m = multiprocessing.managers.SyncManager()
2293 self.assertRaises(TypeError, m.start, 1)
2294 m.start(initializer, (self.ns,))
2295 self.assertEqual(self.ns.test, 1)
2296 m.shutdown()
2297
2298 def test_pool_initializer(self):
2299 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2300 p = multiprocessing.Pool(1, initializer, (self.ns,))
2301 p.close()
2302 p.join()
2303 self.assertEqual(self.ns.test, 1)
2304
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002305#
2306# Issue 5155, 5313, 5331: Test process in processes
2307# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2308#
2309
2310def _ThisSubProcess(q):
2311 try:
2312 item = q.get(block=False)
2313 except pyqueue.Empty:
2314 pass
2315
2316def _TestProcess(q):
2317 queue = multiprocessing.Queue()
2318 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002319 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002320 subProc.start()
2321 subProc.join()
2322
2323def _afunc(x):
2324 return x*x
2325
2326def pool_in_process():
2327 pool = multiprocessing.Pool(processes=4)
2328 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2329
2330class _file_like(object):
2331 def __init__(self, delegate):
2332 self._delegate = delegate
2333 self._pid = None
2334
2335 @property
2336 def cache(self):
2337 pid = os.getpid()
2338 # There are no race conditions since fork keeps only the running thread
2339 if pid != self._pid:
2340 self._pid = pid
2341 self._cache = []
2342 return self._cache
2343
2344 def write(self, data):
2345 self.cache.append(data)
2346
2347 def flush(self):
2348 self._delegate.write(''.join(self.cache))
2349 self._cache = []
2350
2351class TestStdinBadfiledescriptor(unittest.TestCase):
2352
2353 def test_queue_in_process(self):
2354 queue = multiprocessing.Queue()
2355 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2356 proc.start()
2357 proc.join()
2358
2359 def test_pool_in_process(self):
2360 p = multiprocessing.Process(target=pool_in_process)
2361 p.start()
2362 p.join()
2363
2364 def test_flushing(self):
2365 sio = io.StringIO()
2366 flike = _file_like(sio)
2367 flike.write('foo')
2368 proc = multiprocessing.Process(target=lambda: flike.flush())
2369 flike.flush()
2370 assert sio.getvalue() == 'foo'
2371
2372testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2373 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002374
Benjamin Petersone711caf2008-06-11 16:44:04 +00002375#
2376#
2377#
2378
2379def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002380 if sys.platform.startswith("linux"):
2381 try:
2382 lock = multiprocessing.RLock()
2383 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002384 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002385
Charles-François Natali221ef672011-11-22 18:55:22 +01002386 check_enough_semaphores()
2387
Benjamin Petersone711caf2008-06-11 16:44:04 +00002388 if run is None:
2389 from test.support import run_unittest as run
2390
2391 util.get_temp_dir() # creates temp directory for use by all processes
2392
2393 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2394
Benjamin Peterson41181742008-07-02 20:22:54 +00002395 ProcessesMixin.pool = multiprocessing.Pool(4)
2396 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2397 ManagerMixin.manager.__init__()
2398 ManagerMixin.manager.start()
2399 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002400
2401 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002402 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2403 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002404 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2405 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002406 )
2407
2408 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2409 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2410 run(suite)
2411
Benjamin Peterson41181742008-07-02 20:22:54 +00002412 ThreadsMixin.pool.terminate()
2413 ProcessesMixin.pool.terminate()
2414 ManagerMixin.pool.terminate()
2415 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002416
Benjamin Peterson41181742008-07-02 20:22:54 +00002417 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002418
2419def main():
2420 test_main(unittest.TextTestRunner(verbosity=2).run)
2421
2422if __name__ == '__main__':
2423 main()