blob: b99201b2809de815aac8d46252aad70250c43cad [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalibc8f0822011-09-20 20:36:51 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020085if WIN32:
86 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
87
88 def wait_for_handle(handle, timeout):
89 if timeout is None or timeout < 0.0:
90 timeout = INFINITE
91 else:
92 timeout = int(1000 * timeout)
93 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
94else:
95 from select import select
96 _select = util._eintr_retry(select)
97
98 def wait_for_handle(handle, timeout):
99 if timeout is not None and timeout < 0.0:
100 timeout = None
101 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000102
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200103try:
104 MAXFD = os.sysconf("SC_OPEN_MAX")
105except:
106 MAXFD = 256
107
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000109# Some tests require ctypes
110#
111
112try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000113 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000114except ImportError:
115 Structure = object
116 c_int = c_double = None
117
Charles-François Natali221ef672011-11-22 18:55:22 +0100118
119def check_enough_semaphores():
120 """Check that the system supports enough semaphores to run the test."""
121 # minimum number of semaphores available according to POSIX
122 nsems_min = 256
123 try:
124 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
125 except (AttributeError, ValueError):
126 # sysconf not available or setting not available
127 return
128 if nsems == -1 or nsems >= nsems_min:
129 return
130 raise unittest.SkipTest("The OS doesn't support enough semaphores "
131 "to run the test (required: %d)." % nsems_min)
132
133
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000134#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000135# Creates a wrapper for a function which records the time it takes to finish
136#
137
138class TimingWrapper(object):
139
140 def __init__(self, func):
141 self.func = func
142 self.elapsed = None
143
144 def __call__(self, *args, **kwds):
145 t = time.time()
146 try:
147 return self.func(*args, **kwds)
148 finally:
149 self.elapsed = time.time() - t
150
151#
152# Base class for test cases
153#
154
155class BaseTestCase(object):
156
157 ALLOWED_TYPES = ('processes', 'manager', 'threads')
158
159 def assertTimingAlmostEqual(self, a, b):
160 if CHECK_TIMINGS:
161 self.assertAlmostEqual(a, b, 1)
162
163 def assertReturnsIfImplemented(self, value, func, *args):
164 try:
165 res = func(*args)
166 except NotImplementedError:
167 pass
168 else:
169 return self.assertEqual(value, res)
170
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000171 # For the sanity of Windows users, rather than crashing or freezing in
172 # multiple ways.
173 def __reduce__(self, *args):
174 raise NotImplementedError("shouldn't try to pickle a test case")
175
176 __reduce_ex__ = __reduce__
177
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178#
179# Return the value of a semaphore
180#
181
182def get_value(self):
183 try:
184 return self.get_value()
185 except AttributeError:
186 try:
187 return self._Semaphore__value
188 except AttributeError:
189 try:
190 return self._value
191 except AttributeError:
192 raise NotImplementedError
193
194#
195# Testcases
196#
197
198class _TestProcess(BaseTestCase):
199
200 ALLOWED_TYPES = ('processes', 'threads')
201
202 def test_current(self):
203 if self.TYPE == 'threads':
204 return
205
206 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
209 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000210 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000211 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000213 self.assertEqual(current.ident, os.getpid())
214 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000216 def test_daemon_argument(self):
217 if self.TYPE == "threads":
218 return
219
220 # By default uses the current process's daemon flag.
221 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000222 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000223 proc1 = self.Process(target=self._test, daemon=True)
224 self.assertTrue(proc1.daemon)
225 proc2 = self.Process(target=self._test, daemon=False)
226 self.assertFalse(proc2.daemon)
227
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 @classmethod
229 def _test(cls, q, *args, **kwds):
230 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231 q.put(args)
232 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000233 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000234 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000235 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000236 q.put(current.pid)
237
238 def test_process(self):
239 q = self.Queue(1)
240 e = self.Event()
241 args = (q, 1, 2)
242 kwargs = {'hello':23, 'bye':2.54}
243 name = 'SomeProcess'
244 p = self.Process(
245 target=self._test, args=args, kwargs=kwargs, name=name
246 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000247 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248 current = self.current_process()
249
250 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(p.authkey, current.authkey)
252 self.assertEqual(p.is_alive(), False)
253 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000256 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
258 p.start()
259
Ezio Melottib3aedd42010-11-20 19:04:17 +0000260 self.assertEqual(p.exitcode, None)
261 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000262 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
Ezio Melottib3aedd42010-11-20 19:04:17 +0000264 self.assertEqual(q.get(), args[1:])
265 self.assertEqual(q.get(), kwargs)
266 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000268 self.assertEqual(q.get(), current.authkey)
269 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
271 p.join()
272
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(p.exitcode, 0)
274 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000277 @classmethod
278 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 time.sleep(1000)
280
281 def test_terminate(self):
282 if self.TYPE == 'threads':
283 return
284
285 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000286 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000287 p.start()
288
289 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000291 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000292
293 p.terminate()
294
295 join = TimingWrapper(p.join)
296 self.assertEqual(join(), None)
297 self.assertTimingAlmostEqual(join.elapsed, 0.0)
298
299 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000300 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301
302 p.join()
303
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000304 # XXX sometimes get p.exitcode == 0 on Windows ...
305 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306
307 def test_cpu_count(self):
308 try:
309 cpus = multiprocessing.cpu_count()
310 except NotImplementedError:
311 cpus = 1
312 self.assertTrue(type(cpus) is int)
313 self.assertTrue(cpus >= 1)
314
315 def test_active_children(self):
316 self.assertEqual(type(self.active_children()), list)
317
318 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000319 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320
Jesus Cea94f964f2011-09-09 20:26:57 +0200321 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000323 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324
325 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000328 @classmethod
329 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330 from multiprocessing import forking
331 wconn.send(id)
332 if len(id) < 2:
333 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000334 p = cls.Process(
335 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 )
337 p.start()
338 p.join()
339
340 def test_recursion(self):
341 rconn, wconn = self.Pipe(duplex=False)
342 self._test_recursion(wconn, [])
343
344 time.sleep(DELTA)
345 result = []
346 while rconn.poll():
347 result.append(rconn.recv())
348
349 expected = [
350 [],
351 [0],
352 [0, 0],
353 [0, 1],
354 [1],
355 [1, 0],
356 [1, 1]
357 ]
358 self.assertEqual(result, expected)
359
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200360 @classmethod
361 def _test_sentinel(cls, event):
362 event.wait(10.0)
363
364 def test_sentinel(self):
365 if self.TYPE == "threads":
366 return
367 event = self.Event()
368 p = self.Process(target=self._test_sentinel, args=(event,))
369 with self.assertRaises(ValueError):
370 p.sentinel
371 p.start()
372 self.addCleanup(p.join)
373 sentinel = p.sentinel
374 self.assertIsInstance(sentinel, int)
375 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
376 event.set()
377 p.join()
378 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
379
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380#
381#
382#
383
384class _UpperCaser(multiprocessing.Process):
385
386 def __init__(self):
387 multiprocessing.Process.__init__(self)
388 self.child_conn, self.parent_conn = multiprocessing.Pipe()
389
390 def run(self):
391 self.parent_conn.close()
392 for s in iter(self.child_conn.recv, None):
393 self.child_conn.send(s.upper())
394 self.child_conn.close()
395
396 def submit(self, s):
397 assert type(s) is str
398 self.parent_conn.send(s)
399 return self.parent_conn.recv()
400
401 def stop(self):
402 self.parent_conn.send(None)
403 self.parent_conn.close()
404 self.child_conn.close()
405
406class _TestSubclassingProcess(BaseTestCase):
407
408 ALLOWED_TYPES = ('processes',)
409
410 def test_subclassing(self):
411 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200412 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413 uppercaser.start()
414 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
415 self.assertEqual(uppercaser.submit('world'), 'WORLD')
416 uppercaser.stop()
417 uppercaser.join()
418
419#
420#
421#
422
423def queue_empty(q):
424 if hasattr(q, 'empty'):
425 return q.empty()
426 else:
427 return q.qsize() == 0
428
429def queue_full(q, maxsize):
430 if hasattr(q, 'full'):
431 return q.full()
432 else:
433 return q.qsize() == maxsize
434
435
436class _TestQueue(BaseTestCase):
437
438
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000439 @classmethod
440 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 child_can_start.wait()
442 for i in range(6):
443 queue.get()
444 parent_can_continue.set()
445
446 def test_put(self):
447 MAXSIZE = 6
448 queue = self.Queue(maxsize=MAXSIZE)
449 child_can_start = self.Event()
450 parent_can_continue = self.Event()
451
452 proc = self.Process(
453 target=self._test_put,
454 args=(queue, child_can_start, parent_can_continue)
455 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000456 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000457 proc.start()
458
459 self.assertEqual(queue_empty(queue), True)
460 self.assertEqual(queue_full(queue, MAXSIZE), False)
461
462 queue.put(1)
463 queue.put(2, True)
464 queue.put(3, True, None)
465 queue.put(4, False)
466 queue.put(5, False, None)
467 queue.put_nowait(6)
468
469 # the values may be in buffer but not yet in pipe so sleep a bit
470 time.sleep(DELTA)
471
472 self.assertEqual(queue_empty(queue), False)
473 self.assertEqual(queue_full(queue, MAXSIZE), True)
474
475 put = TimingWrapper(queue.put)
476 put_nowait = TimingWrapper(queue.put_nowait)
477
478 self.assertRaises(pyqueue.Full, put, 7, False)
479 self.assertTimingAlmostEqual(put.elapsed, 0)
480
481 self.assertRaises(pyqueue.Full, put, 7, False, None)
482 self.assertTimingAlmostEqual(put.elapsed, 0)
483
484 self.assertRaises(pyqueue.Full, put_nowait, 7)
485 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
486
487 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
488 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
489
490 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
491 self.assertTimingAlmostEqual(put.elapsed, 0)
492
493 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
494 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
495
496 child_can_start.set()
497 parent_can_continue.wait()
498
499 self.assertEqual(queue_empty(queue), True)
500 self.assertEqual(queue_full(queue, MAXSIZE), False)
501
502 proc.join()
503
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000504 @classmethod
505 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000506 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000507 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000508 queue.put(2)
509 queue.put(3)
510 queue.put(4)
511 queue.put(5)
512 parent_can_continue.set()
513
514 def test_get(self):
515 queue = self.Queue()
516 child_can_start = self.Event()
517 parent_can_continue = self.Event()
518
519 proc = self.Process(
520 target=self._test_get,
521 args=(queue, child_can_start, parent_can_continue)
522 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000523 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000524 proc.start()
525
526 self.assertEqual(queue_empty(queue), True)
527
528 child_can_start.set()
529 parent_can_continue.wait()
530
531 time.sleep(DELTA)
532 self.assertEqual(queue_empty(queue), False)
533
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000534 # Hangs unexpectedly, remove for now
535 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000536 self.assertEqual(queue.get(True, None), 2)
537 self.assertEqual(queue.get(True), 3)
538 self.assertEqual(queue.get(timeout=1), 4)
539 self.assertEqual(queue.get_nowait(), 5)
540
541 self.assertEqual(queue_empty(queue), True)
542
543 get = TimingWrapper(queue.get)
544 get_nowait = TimingWrapper(queue.get_nowait)
545
546 self.assertRaises(pyqueue.Empty, get, False)
547 self.assertTimingAlmostEqual(get.elapsed, 0)
548
549 self.assertRaises(pyqueue.Empty, get, False, None)
550 self.assertTimingAlmostEqual(get.elapsed, 0)
551
552 self.assertRaises(pyqueue.Empty, get_nowait)
553 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
554
555 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
556 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
557
558 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
559 self.assertTimingAlmostEqual(get.elapsed, 0)
560
561 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
562 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
563
564 proc.join()
565
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000566 @classmethod
567 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000568 for i in range(10, 20):
569 queue.put(i)
570 # note that at this point the items may only be buffered, so the
571 # process cannot shutdown until the feeder thread has finished
572 # pushing items onto the pipe.
573
574 def test_fork(self):
575 # Old versions of Queue would fail to create a new feeder
576 # thread for a forked process if the original process had its
577 # own feeder thread. This test checks that this no longer
578 # happens.
579
580 queue = self.Queue()
581
582 # put items on queue so that main process starts a feeder thread
583 for i in range(10):
584 queue.put(i)
585
586 # wait to make sure thread starts before we fork a new process
587 time.sleep(DELTA)
588
589 # fork process
590 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200591 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 p.start()
593
594 # check that all expected items are in the queue
595 for i in range(20):
596 self.assertEqual(queue.get(), i)
597 self.assertRaises(pyqueue.Empty, queue.get, False)
598
599 p.join()
600
601 def test_qsize(self):
602 q = self.Queue()
603 try:
604 self.assertEqual(q.qsize(), 0)
605 except NotImplementedError:
606 return
607 q.put(1)
608 self.assertEqual(q.qsize(), 1)
609 q.put(5)
610 self.assertEqual(q.qsize(), 2)
611 q.get()
612 self.assertEqual(q.qsize(), 1)
613 q.get()
614 self.assertEqual(q.qsize(), 0)
615
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000616 @classmethod
617 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000618 for obj in iter(q.get, None):
619 time.sleep(DELTA)
620 q.task_done()
621
622 def test_task_done(self):
623 queue = self.JoinableQueue()
624
625 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000626 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000627
628 workers = [self.Process(target=self._test_task_done, args=(queue,))
629 for i in range(4)]
630
631 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200632 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000633 p.start()
634
635 for i in range(10):
636 queue.put(i)
637
638 queue.join()
639
640 for p in workers:
641 queue.put(None)
642
643 for p in workers:
644 p.join()
645
646#
647#
648#
649
650class _TestLock(BaseTestCase):
651
652 def test_lock(self):
653 lock = self.Lock()
654 self.assertEqual(lock.acquire(), True)
655 self.assertEqual(lock.acquire(False), False)
656 self.assertEqual(lock.release(), None)
657 self.assertRaises((ValueError, threading.ThreadError), lock.release)
658
659 def test_rlock(self):
660 lock = self.RLock()
661 self.assertEqual(lock.acquire(), True)
662 self.assertEqual(lock.acquire(), True)
663 self.assertEqual(lock.acquire(), True)
664 self.assertEqual(lock.release(), None)
665 self.assertEqual(lock.release(), None)
666 self.assertEqual(lock.release(), None)
667 self.assertRaises((AssertionError, RuntimeError), lock.release)
668
Jesse Nollerf8d00852009-03-31 03:25:07 +0000669 def test_lock_context(self):
670 with self.Lock():
671 pass
672
Benjamin Petersone711caf2008-06-11 16:44:04 +0000673
674class _TestSemaphore(BaseTestCase):
675
676 def _test_semaphore(self, sem):
677 self.assertReturnsIfImplemented(2, get_value, sem)
678 self.assertEqual(sem.acquire(), True)
679 self.assertReturnsIfImplemented(1, get_value, sem)
680 self.assertEqual(sem.acquire(), True)
681 self.assertReturnsIfImplemented(0, get_value, sem)
682 self.assertEqual(sem.acquire(False), False)
683 self.assertReturnsIfImplemented(0, get_value, sem)
684 self.assertEqual(sem.release(), None)
685 self.assertReturnsIfImplemented(1, get_value, sem)
686 self.assertEqual(sem.release(), None)
687 self.assertReturnsIfImplemented(2, get_value, sem)
688
689 def test_semaphore(self):
690 sem = self.Semaphore(2)
691 self._test_semaphore(sem)
692 self.assertEqual(sem.release(), None)
693 self.assertReturnsIfImplemented(3, get_value, sem)
694 self.assertEqual(sem.release(), None)
695 self.assertReturnsIfImplemented(4, get_value, sem)
696
697 def test_bounded_semaphore(self):
698 sem = self.BoundedSemaphore(2)
699 self._test_semaphore(sem)
700 # Currently fails on OS/X
701 #if HAVE_GETVALUE:
702 # self.assertRaises(ValueError, sem.release)
703 # self.assertReturnsIfImplemented(2, get_value, sem)
704
705 def test_timeout(self):
706 if self.TYPE != 'processes':
707 return
708
709 sem = self.Semaphore(0)
710 acquire = TimingWrapper(sem.acquire)
711
712 self.assertEqual(acquire(False), False)
713 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
714
715 self.assertEqual(acquire(False, None), False)
716 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
717
718 self.assertEqual(acquire(False, TIMEOUT1), False)
719 self.assertTimingAlmostEqual(acquire.elapsed, 0)
720
721 self.assertEqual(acquire(True, TIMEOUT2), False)
722 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
723
724 self.assertEqual(acquire(timeout=TIMEOUT3), False)
725 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
726
727
728class _TestCondition(BaseTestCase):
729
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000730 @classmethod
731 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000732 cond.acquire()
733 sleeping.release()
734 cond.wait(timeout)
735 woken.release()
736 cond.release()
737
738 def check_invariant(self, cond):
739 # this is only supposed to succeed when there are no sleepers
740 if self.TYPE == 'processes':
741 try:
742 sleepers = (cond._sleeping_count.get_value() -
743 cond._woken_count.get_value())
744 self.assertEqual(sleepers, 0)
745 self.assertEqual(cond._wait_semaphore.get_value(), 0)
746 except NotImplementedError:
747 pass
748
749 def test_notify(self):
750 cond = self.Condition()
751 sleeping = self.Semaphore(0)
752 woken = self.Semaphore(0)
753
754 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000755 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 p.start()
757
758 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000759 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000760 p.start()
761
762 # wait for both children to start sleeping
763 sleeping.acquire()
764 sleeping.acquire()
765
766 # check no process/thread has woken up
767 time.sleep(DELTA)
768 self.assertReturnsIfImplemented(0, get_value, woken)
769
770 # wake up one process/thread
771 cond.acquire()
772 cond.notify()
773 cond.release()
774
775 # check one process/thread has woken up
776 time.sleep(DELTA)
777 self.assertReturnsIfImplemented(1, get_value, woken)
778
779 # wake up another
780 cond.acquire()
781 cond.notify()
782 cond.release()
783
784 # check other has woken up
785 time.sleep(DELTA)
786 self.assertReturnsIfImplemented(2, get_value, woken)
787
788 # check state is not mucked up
789 self.check_invariant(cond)
790 p.join()
791
792 def test_notify_all(self):
793 cond = self.Condition()
794 sleeping = self.Semaphore(0)
795 woken = self.Semaphore(0)
796
797 # start some threads/processes which will timeout
798 for i in range(3):
799 p = self.Process(target=self.f,
800 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000801 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 p.start()
803
804 t = threading.Thread(target=self.f,
805 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000806 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 t.start()
808
809 # wait for them all to sleep
810 for i in range(6):
811 sleeping.acquire()
812
813 # check they have all timed out
814 for i in range(6):
815 woken.acquire()
816 self.assertReturnsIfImplemented(0, get_value, woken)
817
818 # check state is not mucked up
819 self.check_invariant(cond)
820
821 # start some more threads/processes
822 for i in range(3):
823 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000824 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000825 p.start()
826
827 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000828 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829 t.start()
830
831 # wait for them to all sleep
832 for i in range(6):
833 sleeping.acquire()
834
835 # check no process/thread has woken up
836 time.sleep(DELTA)
837 self.assertReturnsIfImplemented(0, get_value, woken)
838
839 # wake them all up
840 cond.acquire()
841 cond.notify_all()
842 cond.release()
843
844 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200845 for i in range(10):
846 try:
847 if get_value(woken) == 6:
848 break
849 except NotImplementedError:
850 break
851 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000852 self.assertReturnsIfImplemented(6, get_value, woken)
853
854 # check state is not mucked up
855 self.check_invariant(cond)
856
857 def test_timeout(self):
858 cond = self.Condition()
859 wait = TimingWrapper(cond.wait)
860 cond.acquire()
861 res = wait(TIMEOUT1)
862 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000863 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000864 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
865
866
867class _TestEvent(BaseTestCase):
868
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000869 @classmethod
870 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 time.sleep(TIMEOUT2)
872 event.set()
873
874 def test_event(self):
875 event = self.Event()
876 wait = TimingWrapper(event.wait)
877
Ezio Melotti13925002011-03-16 11:05:33 +0200878 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000879 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000880 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881
Benjamin Peterson965ce872009-04-05 21:24:58 +0000882 # Removed, threading.Event.wait() will return the value of the __flag
883 # instead of None. API Shear with the semaphore backed mp.Event
884 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000886 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000887 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
888
889 event.set()
890
891 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000892 self.assertEqual(event.is_set(), True)
893 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000895 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
897 # self.assertEqual(event.is_set(), True)
898
899 event.clear()
900
901 #self.assertEqual(event.is_set(), False)
902
Jesus Cea94f964f2011-09-09 20:26:57 +0200903 p = self.Process(target=self._test_event, args=(event,))
904 p.daemon = True
905 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000906 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000907
908#
909#
910#
911
912class _TestValue(BaseTestCase):
913
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000914 ALLOWED_TYPES = ('processes',)
915
Benjamin Petersone711caf2008-06-11 16:44:04 +0000916 codes_values = [
917 ('i', 4343, 24234),
918 ('d', 3.625, -4.25),
919 ('h', -232, 234),
920 ('c', latin('x'), latin('y'))
921 ]
922
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000923 def setUp(self):
924 if not HAS_SHAREDCTYPES:
925 self.skipTest("requires multiprocessing.sharedctypes")
926
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000927 @classmethod
928 def _test(cls, values):
929 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930 sv.value = cv[2]
931
932
933 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000934 if raw:
935 values = [self.RawValue(code, value)
936 for code, value, _ in self.codes_values]
937 else:
938 values = [self.Value(code, value)
939 for code, value, _ in self.codes_values]
940
941 for sv, cv in zip(values, self.codes_values):
942 self.assertEqual(sv.value, cv[1])
943
944 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200945 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 proc.start()
947 proc.join()
948
949 for sv, cv in zip(values, self.codes_values):
950 self.assertEqual(sv.value, cv[2])
951
952 def test_rawvalue(self):
953 self.test_value(raw=True)
954
955 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000956 val1 = self.Value('i', 5)
957 lock1 = val1.get_lock()
958 obj1 = val1.get_obj()
959
960 val2 = self.Value('i', 5, lock=None)
961 lock2 = val2.get_lock()
962 obj2 = val2.get_obj()
963
964 lock = self.Lock()
965 val3 = self.Value('i', 5, lock=lock)
966 lock3 = val3.get_lock()
967 obj3 = val3.get_obj()
968 self.assertEqual(lock, lock3)
969
Jesse Nollerb0516a62009-01-18 03:11:38 +0000970 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 self.assertFalse(hasattr(arr4, 'get_lock'))
972 self.assertFalse(hasattr(arr4, 'get_obj'))
973
Jesse Nollerb0516a62009-01-18 03:11:38 +0000974 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
975
976 arr5 = self.RawValue('i', 5)
977 self.assertFalse(hasattr(arr5, 'get_lock'))
978 self.assertFalse(hasattr(arr5, 'get_obj'))
979
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980
981class _TestArray(BaseTestCase):
982
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000983 ALLOWED_TYPES = ('processes',)
984
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000985 @classmethod
986 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000987 for i in range(1, len(seq)):
988 seq[i] += seq[i-1]
989
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000990 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000992 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
993 if raw:
994 arr = self.RawArray('i', seq)
995 else:
996 arr = self.Array('i', seq)
997
998 self.assertEqual(len(arr), len(seq))
999 self.assertEqual(arr[3], seq[3])
1000 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1001
1002 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1003
1004 self.assertEqual(list(arr[:]), seq)
1005
1006 self.f(seq)
1007
1008 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001009 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010 p.start()
1011 p.join()
1012
1013 self.assertEqual(list(arr[:]), seq)
1014
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001015 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001016 def test_array_from_size(self):
1017 size = 10
1018 # Test for zeroing (see issue #11675).
1019 # The repetition below strengthens the test by increasing the chances
1020 # of previously allocated non-zero memory being used for the new array
1021 # on the 2nd and 3rd loops.
1022 for _ in range(3):
1023 arr = self.Array('i', size)
1024 self.assertEqual(len(arr), size)
1025 self.assertEqual(list(arr), [0] * size)
1026 arr[:] = range(10)
1027 self.assertEqual(list(arr), list(range(10)))
1028 del arr
1029
1030 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001031 def test_rawarray(self):
1032 self.test_array(raw=True)
1033
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001034 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001036 arr1 = self.Array('i', list(range(10)))
1037 lock1 = arr1.get_lock()
1038 obj1 = arr1.get_obj()
1039
1040 arr2 = self.Array('i', list(range(10)), lock=None)
1041 lock2 = arr2.get_lock()
1042 obj2 = arr2.get_obj()
1043
1044 lock = self.Lock()
1045 arr3 = self.Array('i', list(range(10)), lock=lock)
1046 lock3 = arr3.get_lock()
1047 obj3 = arr3.get_obj()
1048 self.assertEqual(lock, lock3)
1049
Jesse Nollerb0516a62009-01-18 03:11:38 +00001050 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001051 self.assertFalse(hasattr(arr4, 'get_lock'))
1052 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001053 self.assertRaises(AttributeError,
1054 self.Array, 'i', range(10), lock='notalock')
1055
1056 arr5 = self.RawArray('i', range(10))
1057 self.assertFalse(hasattr(arr5, 'get_lock'))
1058 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001059
1060#
1061#
1062#
1063
1064class _TestContainers(BaseTestCase):
1065
1066 ALLOWED_TYPES = ('manager',)
1067
1068 def test_list(self):
1069 a = self.list(list(range(10)))
1070 self.assertEqual(a[:], list(range(10)))
1071
1072 b = self.list()
1073 self.assertEqual(b[:], [])
1074
1075 b.extend(list(range(5)))
1076 self.assertEqual(b[:], list(range(5)))
1077
1078 self.assertEqual(b[2], 2)
1079 self.assertEqual(b[2:10], [2,3,4])
1080
1081 b *= 2
1082 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1083
1084 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1085
1086 self.assertEqual(a[:], list(range(10)))
1087
1088 d = [a, b]
1089 e = self.list(d)
1090 self.assertEqual(
1091 e[:],
1092 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1093 )
1094
1095 f = self.list([a])
1096 a.append('hello')
1097 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1098
1099 def test_dict(self):
1100 d = self.dict()
1101 indices = list(range(65, 70))
1102 for i in indices:
1103 d[i] = chr(i)
1104 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1105 self.assertEqual(sorted(d.keys()), indices)
1106 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1107 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1108
1109 def test_namespace(self):
1110 n = self.Namespace()
1111 n.name = 'Bob'
1112 n.job = 'Builder'
1113 n._hidden = 'hidden'
1114 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1115 del n.job
1116 self.assertEqual(str(n), "Namespace(name='Bob')")
1117 self.assertTrue(hasattr(n, 'name'))
1118 self.assertTrue(not hasattr(n, 'job'))
1119
1120#
1121#
1122#
1123
1124def sqr(x, wait=0.0):
1125 time.sleep(wait)
1126 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001127
Benjamin Petersone711caf2008-06-11 16:44:04 +00001128class _TestPool(BaseTestCase):
1129
1130 def test_apply(self):
1131 papply = self.pool.apply
1132 self.assertEqual(papply(sqr, (5,)), sqr(5))
1133 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1134
1135 def test_map(self):
1136 pmap = self.pool.map
1137 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1138 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1139 list(map(sqr, list(range(100)))))
1140
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001141 def test_map_chunksize(self):
1142 try:
1143 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1144 except multiprocessing.TimeoutError:
1145 self.fail("pool.map_async with chunksize stalled on null list")
1146
Benjamin Petersone711caf2008-06-11 16:44:04 +00001147 def test_async(self):
1148 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1149 get = TimingWrapper(res.get)
1150 self.assertEqual(get(), 49)
1151 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1152
1153 def test_async_timeout(self):
1154 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1155 get = TimingWrapper(res.get)
1156 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1157 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1158
1159 def test_imap(self):
1160 it = self.pool.imap(sqr, list(range(10)))
1161 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1162
1163 it = self.pool.imap(sqr, list(range(10)))
1164 for i in range(10):
1165 self.assertEqual(next(it), i*i)
1166 self.assertRaises(StopIteration, it.__next__)
1167
1168 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1169 for i in range(1000):
1170 self.assertEqual(next(it), i*i)
1171 self.assertRaises(StopIteration, it.__next__)
1172
1173 def test_imap_unordered(self):
1174 it = self.pool.imap_unordered(sqr, list(range(1000)))
1175 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1176
1177 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1178 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1179
1180 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001181 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1182 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1183
Benjamin Petersone711caf2008-06-11 16:44:04 +00001184 p = multiprocessing.Pool(3)
1185 self.assertEqual(3, len(p._pool))
1186 p.close()
1187 p.join()
1188
1189 def test_terminate(self):
1190 if self.TYPE == 'manager':
1191 # On Unix a forked process increfs each shared object to
1192 # which its parent process held a reference. If the
1193 # forked process gets terminated then there is likely to
1194 # be a reference leak. So to prevent
1195 # _TestZZZNumberOfObjects from failing we skip this test
1196 # when using a manager.
1197 return
1198
1199 result = self.pool.map_async(
1200 time.sleep, [0.1 for i in range(10000)], chunksize=1
1201 )
1202 self.pool.terminate()
1203 join = TimingWrapper(self.pool.join)
1204 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001205 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001206
Ask Solem2afcbf22010-11-09 20:55:52 +00001207def raising():
1208 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001209
Ask Solem2afcbf22010-11-09 20:55:52 +00001210def unpickleable_result():
1211 return lambda: 42
1212
1213class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001214 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001215
1216 def test_async_error_callback(self):
1217 p = multiprocessing.Pool(2)
1218
1219 scratchpad = [None]
1220 def errback(exc):
1221 scratchpad[0] = exc
1222
1223 res = p.apply_async(raising, error_callback=errback)
1224 self.assertRaises(KeyError, res.get)
1225 self.assertTrue(scratchpad[0])
1226 self.assertIsInstance(scratchpad[0], KeyError)
1227
1228 p.close()
1229 p.join()
1230
1231 def test_unpickleable_result(self):
1232 from multiprocessing.pool import MaybeEncodingError
1233 p = multiprocessing.Pool(2)
1234
1235 # Make sure we don't lose pool processes because of encoding errors.
1236 for iteration in range(20):
1237
1238 scratchpad = [None]
1239 def errback(exc):
1240 scratchpad[0] = exc
1241
1242 res = p.apply_async(unpickleable_result, error_callback=errback)
1243 self.assertRaises(MaybeEncodingError, res.get)
1244 wrapped = scratchpad[0]
1245 self.assertTrue(wrapped)
1246 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1247 self.assertIsNotNone(wrapped.exc)
1248 self.assertIsNotNone(wrapped.value)
1249
1250 p.close()
1251 p.join()
1252
1253class _TestPoolWorkerLifetime(BaseTestCase):
1254 ALLOWED_TYPES = ('processes', )
1255
Jesse Noller1f0b6582010-01-27 03:36:01 +00001256 def test_pool_worker_lifetime(self):
1257 p = multiprocessing.Pool(3, maxtasksperchild=10)
1258 self.assertEqual(3, len(p._pool))
1259 origworkerpids = [w.pid for w in p._pool]
1260 # Run many tasks so each worker gets replaced (hopefully)
1261 results = []
1262 for i in range(100):
1263 results.append(p.apply_async(sqr, (i, )))
1264 # Fetch the results and verify we got the right answers,
1265 # also ensuring all the tasks have completed.
1266 for (j, res) in enumerate(results):
1267 self.assertEqual(res.get(), sqr(j))
1268 # Refill the pool
1269 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001270 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001271 # (countdown * DELTA = 5 seconds max startup process time)
1272 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001273 while countdown and not all(w.is_alive() for w in p._pool):
1274 countdown -= 1
1275 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001276 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001277 # All pids should be assigned. See issue #7805.
1278 self.assertNotIn(None, origworkerpids)
1279 self.assertNotIn(None, finalworkerpids)
1280 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001281 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1282 p.close()
1283 p.join()
1284
Charles-François Natalif8859e12011-10-24 18:45:29 +02001285 def test_pool_worker_lifetime_early_close(self):
1286 # Issue #10332: closing a pool whose workers have limited lifetimes
1287 # before all the tasks completed would make join() hang.
1288 p = multiprocessing.Pool(3, maxtasksperchild=1)
1289 results = []
1290 for i in range(6):
1291 results.append(p.apply_async(sqr, (i, 0.3)))
1292 p.close()
1293 p.join()
1294 # check the results
1295 for (j, res) in enumerate(results):
1296 self.assertEqual(res.get(), sqr(j))
1297
1298
Benjamin Petersone711caf2008-06-11 16:44:04 +00001299#
1300# Test that manager has expected number of shared objects left
1301#
1302
1303class _TestZZZNumberOfObjects(BaseTestCase):
1304 # Because test cases are sorted alphabetically, this one will get
1305 # run after all the other tests for the manager. It tests that
1306 # there have been no "reference leaks" for the manager's shared
1307 # objects. Note the comment in _TestPool.test_terminate().
1308 ALLOWED_TYPES = ('manager',)
1309
1310 def test_number_of_objects(self):
1311 EXPECTED_NUMBER = 1 # the pool object is still alive
1312 multiprocessing.active_children() # discard dead process objs
1313 gc.collect() # do garbage collection
1314 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001315 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001316 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001317 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001318 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001319
1320 self.assertEqual(refs, EXPECTED_NUMBER)
1321
1322#
1323# Test of creating a customized manager class
1324#
1325
1326from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1327
1328class FooBar(object):
1329 def f(self):
1330 return 'f()'
1331 def g(self):
1332 raise ValueError
1333 def _h(self):
1334 return '_h()'
1335
1336def baz():
1337 for i in range(10):
1338 yield i*i
1339
1340class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001341 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001342 def __iter__(self):
1343 return self
1344 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001345 return self._callmethod('__next__')
1346
1347class MyManager(BaseManager):
1348 pass
1349
1350MyManager.register('Foo', callable=FooBar)
1351MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1352MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1353
1354
1355class _TestMyManager(BaseTestCase):
1356
1357 ALLOWED_TYPES = ('manager',)
1358
1359 def test_mymanager(self):
1360 manager = MyManager()
1361 manager.start()
1362
1363 foo = manager.Foo()
1364 bar = manager.Bar()
1365 baz = manager.baz()
1366
1367 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1368 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1369
1370 self.assertEqual(foo_methods, ['f', 'g'])
1371 self.assertEqual(bar_methods, ['f', '_h'])
1372
1373 self.assertEqual(foo.f(), 'f()')
1374 self.assertRaises(ValueError, foo.g)
1375 self.assertEqual(foo._callmethod('f'), 'f()')
1376 self.assertRaises(RemoteError, foo._callmethod, '_h')
1377
1378 self.assertEqual(bar.f(), 'f()')
1379 self.assertEqual(bar._h(), '_h()')
1380 self.assertEqual(bar._callmethod('f'), 'f()')
1381 self.assertEqual(bar._callmethod('_h'), '_h()')
1382
1383 self.assertEqual(list(baz), [i*i for i in range(10)])
1384
1385 manager.shutdown()
1386
1387#
1388# Test of connecting to a remote server and using xmlrpclib for serialization
1389#
1390
1391_queue = pyqueue.Queue()
1392def get_queue():
1393 return _queue
1394
1395class QueueManager(BaseManager):
1396 '''manager class used by server process'''
1397QueueManager.register('get_queue', callable=get_queue)
1398
1399class QueueManager2(BaseManager):
1400 '''manager class which specifies the same interface as QueueManager'''
1401QueueManager2.register('get_queue')
1402
1403
1404SERIALIZER = 'xmlrpclib'
1405
1406class _TestRemoteManager(BaseTestCase):
1407
1408 ALLOWED_TYPES = ('manager',)
1409
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001410 @classmethod
1411 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001412 manager = QueueManager2(
1413 address=address, authkey=authkey, serializer=SERIALIZER
1414 )
1415 manager.connect()
1416 queue = manager.get_queue()
1417 queue.put(('hello world', None, True, 2.25))
1418
1419 def test_remote(self):
1420 authkey = os.urandom(32)
1421
1422 manager = QueueManager(
1423 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1424 )
1425 manager.start()
1426
1427 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001428 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001429 p.start()
1430
1431 manager2 = QueueManager2(
1432 address=manager.address, authkey=authkey, serializer=SERIALIZER
1433 )
1434 manager2.connect()
1435 queue = manager2.get_queue()
1436
1437 # Note that xmlrpclib will deserialize object as a list not a tuple
1438 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1439
1440 # Because we are using xmlrpclib for serialization instead of
1441 # pickle this will cause a serialization error.
1442 self.assertRaises(Exception, queue.put, time.sleep)
1443
1444 # Make queue finalizer run before the server is stopped
1445 del queue
1446 manager.shutdown()
1447
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001448class _TestManagerRestart(BaseTestCase):
1449
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001450 @classmethod
1451 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001452 manager = QueueManager(
1453 address=address, authkey=authkey, serializer=SERIALIZER)
1454 manager.connect()
1455 queue = manager.get_queue()
1456 queue.put('hello world')
1457
1458 def test_rapid_restart(self):
1459 authkey = os.urandom(32)
1460 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001461 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001462 srvr = manager.get_server()
1463 addr = srvr.address
1464 # Close the connection.Listener socket which gets opened as a part
1465 # of manager.get_server(). It's not needed for the test.
1466 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001467 manager.start()
1468
1469 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001470 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001471 p.start()
1472 queue = manager.get_queue()
1473 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001474 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001475 manager.shutdown()
1476 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001477 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001478 try:
1479 manager.start()
1480 except IOError as e:
1481 if e.errno != errno.EADDRINUSE:
1482 raise
1483 # Retry after some time, in case the old socket was lingering
1484 # (sporadic failure on buildbots)
1485 time.sleep(1.0)
1486 manager = QueueManager(
1487 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001488 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001489
Benjamin Petersone711caf2008-06-11 16:44:04 +00001490#
1491#
1492#
1493
1494SENTINEL = latin('')
1495
1496class _TestConnection(BaseTestCase):
1497
1498 ALLOWED_TYPES = ('processes', 'threads')
1499
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001500 @classmethod
1501 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001502 for msg in iter(conn.recv_bytes, SENTINEL):
1503 conn.send_bytes(msg)
1504 conn.close()
1505
1506 def test_connection(self):
1507 conn, child_conn = self.Pipe()
1508
1509 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001510 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001511 p.start()
1512
1513 seq = [1, 2.25, None]
1514 msg = latin('hello world')
1515 longmsg = msg * 10
1516 arr = array.array('i', list(range(4)))
1517
1518 if self.TYPE == 'processes':
1519 self.assertEqual(type(conn.fileno()), int)
1520
1521 self.assertEqual(conn.send(seq), None)
1522 self.assertEqual(conn.recv(), seq)
1523
1524 self.assertEqual(conn.send_bytes(msg), None)
1525 self.assertEqual(conn.recv_bytes(), msg)
1526
1527 if self.TYPE == 'processes':
1528 buffer = array.array('i', [0]*10)
1529 expected = list(arr) + [0] * (10 - len(arr))
1530 self.assertEqual(conn.send_bytes(arr), None)
1531 self.assertEqual(conn.recv_bytes_into(buffer),
1532 len(arr) * buffer.itemsize)
1533 self.assertEqual(list(buffer), expected)
1534
1535 buffer = array.array('i', [0]*10)
1536 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1537 self.assertEqual(conn.send_bytes(arr), None)
1538 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1539 len(arr) * buffer.itemsize)
1540 self.assertEqual(list(buffer), expected)
1541
1542 buffer = bytearray(latin(' ' * 40))
1543 self.assertEqual(conn.send_bytes(longmsg), None)
1544 try:
1545 res = conn.recv_bytes_into(buffer)
1546 except multiprocessing.BufferTooShort as e:
1547 self.assertEqual(e.args, (longmsg,))
1548 else:
1549 self.fail('expected BufferTooShort, got %s' % res)
1550
1551 poll = TimingWrapper(conn.poll)
1552
1553 self.assertEqual(poll(), False)
1554 self.assertTimingAlmostEqual(poll.elapsed, 0)
1555
1556 self.assertEqual(poll(TIMEOUT1), False)
1557 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1558
1559 conn.send(None)
1560
1561 self.assertEqual(poll(TIMEOUT1), True)
1562 self.assertTimingAlmostEqual(poll.elapsed, 0)
1563
1564 self.assertEqual(conn.recv(), None)
1565
1566 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1567 conn.send_bytes(really_big_msg)
1568 self.assertEqual(conn.recv_bytes(), really_big_msg)
1569
1570 conn.send_bytes(SENTINEL) # tell child to quit
1571 child_conn.close()
1572
1573 if self.TYPE == 'processes':
1574 self.assertEqual(conn.readable, True)
1575 self.assertEqual(conn.writable, True)
1576 self.assertRaises(EOFError, conn.recv)
1577 self.assertRaises(EOFError, conn.recv_bytes)
1578
1579 p.join()
1580
1581 def test_duplex_false(self):
1582 reader, writer = self.Pipe(duplex=False)
1583 self.assertEqual(writer.send(1), None)
1584 self.assertEqual(reader.recv(), 1)
1585 if self.TYPE == 'processes':
1586 self.assertEqual(reader.readable, True)
1587 self.assertEqual(reader.writable, False)
1588 self.assertEqual(writer.readable, False)
1589 self.assertEqual(writer.writable, True)
1590 self.assertRaises(IOError, reader.send, 2)
1591 self.assertRaises(IOError, writer.recv)
1592 self.assertRaises(IOError, writer.poll)
1593
1594 def test_spawn_close(self):
1595 # We test that a pipe connection can be closed by parent
1596 # process immediately after child is spawned. On Windows this
1597 # would have sometimes failed on old versions because
1598 # child_conn would be closed before the child got a chance to
1599 # duplicate it.
1600 conn, child_conn = self.Pipe()
1601
1602 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001603 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001604 p.start()
1605 child_conn.close() # this might complete before child initializes
1606
1607 msg = latin('hello')
1608 conn.send_bytes(msg)
1609 self.assertEqual(conn.recv_bytes(), msg)
1610
1611 conn.send_bytes(SENTINEL)
1612 conn.close()
1613 p.join()
1614
1615 def test_sendbytes(self):
1616 if self.TYPE != 'processes':
1617 return
1618
1619 msg = latin('abcdefghijklmnopqrstuvwxyz')
1620 a, b = self.Pipe()
1621
1622 a.send_bytes(msg)
1623 self.assertEqual(b.recv_bytes(), msg)
1624
1625 a.send_bytes(msg, 5)
1626 self.assertEqual(b.recv_bytes(), msg[5:])
1627
1628 a.send_bytes(msg, 7, 8)
1629 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1630
1631 a.send_bytes(msg, 26)
1632 self.assertEqual(b.recv_bytes(), latin(''))
1633
1634 a.send_bytes(msg, 26, 0)
1635 self.assertEqual(b.recv_bytes(), latin(''))
1636
1637 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1638
1639 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1640
1641 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1642
1643 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1644
1645 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1646
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001647 @classmethod
1648 def _is_fd_assigned(cls, fd):
1649 try:
1650 os.fstat(fd)
1651 except OSError as e:
1652 if e.errno == errno.EBADF:
1653 return False
1654 raise
1655 else:
1656 return True
1657
1658 @classmethod
1659 def _writefd(cls, conn, data, create_dummy_fds=False):
1660 if create_dummy_fds:
1661 for i in range(0, 256):
1662 if not cls._is_fd_assigned(i):
1663 os.dup2(conn.fileno(), i)
1664 fd = reduction.recv_handle(conn)
1665 if msvcrt:
1666 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1667 os.write(fd, data)
1668 os.close(fd)
1669
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001670 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001671 def test_fd_transfer(self):
1672 if self.TYPE != 'processes':
1673 self.skipTest("only makes sense with processes")
1674 conn, child_conn = self.Pipe(duplex=True)
1675
1676 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001677 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001678 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001679 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001680 with open(test.support.TESTFN, "wb") as f:
1681 fd = f.fileno()
1682 if msvcrt:
1683 fd = msvcrt.get_osfhandle(fd)
1684 reduction.send_handle(conn, fd, p.pid)
1685 p.join()
1686 with open(test.support.TESTFN, "rb") as f:
1687 self.assertEqual(f.read(), b"foo")
1688
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001689 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001690 @unittest.skipIf(sys.platform == "win32",
1691 "test semantics don't make sense on Windows")
1692 @unittest.skipIf(MAXFD <= 256,
1693 "largest assignable fd number is too small")
1694 @unittest.skipUnless(hasattr(os, "dup2"),
1695 "test needs os.dup2()")
1696 def test_large_fd_transfer(self):
1697 # With fd > 256 (issue #11657)
1698 if self.TYPE != 'processes':
1699 self.skipTest("only makes sense with processes")
1700 conn, child_conn = self.Pipe(duplex=True)
1701
1702 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001703 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001704 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001705 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001706 with open(test.support.TESTFN, "wb") as f:
1707 fd = f.fileno()
1708 for newfd in range(256, MAXFD):
1709 if not self._is_fd_assigned(newfd):
1710 break
1711 else:
1712 self.fail("could not find an unassigned large file descriptor")
1713 os.dup2(fd, newfd)
1714 try:
1715 reduction.send_handle(conn, newfd, p.pid)
1716 finally:
1717 os.close(newfd)
1718 p.join()
1719 with open(test.support.TESTFN, "rb") as f:
1720 self.assertEqual(f.read(), b"bar")
1721
Jesus Cea4507e642011-09-21 03:53:25 +02001722 @classmethod
1723 def _send_data_without_fd(self, conn):
1724 os.write(conn.fileno(), b"\0")
1725
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001726 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001727 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1728 def test_missing_fd_transfer(self):
1729 # Check that exception is raised when received data is not
1730 # accompanied by a file descriptor in ancillary data.
1731 if self.TYPE != 'processes':
1732 self.skipTest("only makes sense with processes")
1733 conn, child_conn = self.Pipe(duplex=True)
1734
1735 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1736 p.daemon = True
1737 p.start()
1738 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1739 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001740
Benjamin Petersone711caf2008-06-11 16:44:04 +00001741class _TestListenerClient(BaseTestCase):
1742
1743 ALLOWED_TYPES = ('processes', 'threads')
1744
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001745 @classmethod
1746 def _test(cls, address):
1747 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001748 conn.send('hello')
1749 conn.close()
1750
1751 def test_listener_client(self):
1752 for family in self.connection.families:
1753 l = self.connection.Listener(family=family)
1754 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001755 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001756 p.start()
1757 conn = l.accept()
1758 self.assertEqual(conn.recv(), 'hello')
1759 p.join()
1760 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001761#
1762# Test of sending connection and socket objects between processes
1763#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001764"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001765class _TestPicklingConnections(BaseTestCase):
1766
1767 ALLOWED_TYPES = ('processes',)
1768
1769 def _listener(self, conn, families):
1770 for fam in families:
1771 l = self.connection.Listener(family=fam)
1772 conn.send(l.address)
1773 new_conn = l.accept()
1774 conn.send(new_conn)
1775
1776 if self.TYPE == 'processes':
1777 l = socket.socket()
1778 l.bind(('localhost', 0))
1779 conn.send(l.getsockname())
1780 l.listen(1)
1781 new_conn, addr = l.accept()
1782 conn.send(new_conn)
1783
1784 conn.recv()
1785
1786 def _remote(self, conn):
1787 for (address, msg) in iter(conn.recv, None):
1788 client = self.connection.Client(address)
1789 client.send(msg.upper())
1790 client.close()
1791
1792 if self.TYPE == 'processes':
1793 address, msg = conn.recv()
1794 client = socket.socket()
1795 client.connect(address)
1796 client.sendall(msg.upper())
1797 client.close()
1798
1799 conn.close()
1800
1801 def test_pickling(self):
1802 try:
1803 multiprocessing.allow_connection_pickling()
1804 except ImportError:
1805 return
1806
1807 families = self.connection.families
1808
1809 lconn, lconn0 = self.Pipe()
1810 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001811 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001812 lp.start()
1813 lconn0.close()
1814
1815 rconn, rconn0 = self.Pipe()
1816 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001817 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001818 rp.start()
1819 rconn0.close()
1820
1821 for fam in families:
1822 msg = ('This connection uses family %s' % fam).encode('ascii')
1823 address = lconn.recv()
1824 rconn.send((address, msg))
1825 new_conn = lconn.recv()
1826 self.assertEqual(new_conn.recv(), msg.upper())
1827
1828 rconn.send(None)
1829
1830 if self.TYPE == 'processes':
1831 msg = latin('This connection uses a normal socket')
1832 address = lconn.recv()
1833 rconn.send((address, msg))
1834 if hasattr(socket, 'fromfd'):
1835 new_conn = lconn.recv()
1836 self.assertEqual(new_conn.recv(100), msg.upper())
1837 else:
1838 # XXX On Windows with Py2.6 need to backport fromfd()
1839 discard = lconn.recv_bytes()
1840
1841 lconn.send(None)
1842
1843 rconn.close()
1844 lconn.close()
1845
1846 lp.join()
1847 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001848"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001849#
1850#
1851#
1852
1853class _TestHeap(BaseTestCase):
1854
1855 ALLOWED_TYPES = ('processes',)
1856
1857 def test_heap(self):
1858 iterations = 5000
1859 maxblocks = 50
1860 blocks = []
1861
1862 # create and destroy lots of blocks of different sizes
1863 for i in range(iterations):
1864 size = int(random.lognormvariate(0, 1) * 1000)
1865 b = multiprocessing.heap.BufferWrapper(size)
1866 blocks.append(b)
1867 if len(blocks) > maxblocks:
1868 i = random.randrange(maxblocks)
1869 del blocks[i]
1870
1871 # get the heap object
1872 heap = multiprocessing.heap.BufferWrapper._heap
1873
1874 # verify the state of the heap
1875 all = []
1876 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001877 heap._lock.acquire()
1878 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001879 for L in list(heap._len_to_seq.values()):
1880 for arena, start, stop in L:
1881 all.append((heap._arenas.index(arena), start, stop,
1882 stop-start, 'free'))
1883 for arena, start, stop in heap._allocated_blocks:
1884 all.append((heap._arenas.index(arena), start, stop,
1885 stop-start, 'occupied'))
1886 occupied += (stop-start)
1887
1888 all.sort()
1889
1890 for i in range(len(all)-1):
1891 (arena, start, stop) = all[i][:3]
1892 (narena, nstart, nstop) = all[i+1][:3]
1893 self.assertTrue((arena != narena and nstart == 0) or
1894 (stop == nstart))
1895
Charles-François Natali778db492011-07-02 14:35:49 +02001896 def test_free_from_gc(self):
1897 # Check that freeing of blocks by the garbage collector doesn't deadlock
1898 # (issue #12352).
1899 # Make sure the GC is enabled, and set lower collection thresholds to
1900 # make collections more frequent (and increase the probability of
1901 # deadlock).
1902 if not gc.isenabled():
1903 gc.enable()
1904 self.addCleanup(gc.disable)
1905 thresholds = gc.get_threshold()
1906 self.addCleanup(gc.set_threshold, *thresholds)
1907 gc.set_threshold(10)
1908
1909 # perform numerous block allocations, with cyclic references to make
1910 # sure objects are collected asynchronously by the gc
1911 for i in range(5000):
1912 a = multiprocessing.heap.BufferWrapper(1)
1913 b = multiprocessing.heap.BufferWrapper(1)
1914 # circular references
1915 a.buddy = b
1916 b.buddy = a
1917
Benjamin Petersone711caf2008-06-11 16:44:04 +00001918#
1919#
1920#
1921
Benjamin Petersone711caf2008-06-11 16:44:04 +00001922class _Foo(Structure):
1923 _fields_ = [
1924 ('x', c_int),
1925 ('y', c_double)
1926 ]
1927
1928class _TestSharedCTypes(BaseTestCase):
1929
1930 ALLOWED_TYPES = ('processes',)
1931
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001932 def setUp(self):
1933 if not HAS_SHAREDCTYPES:
1934 self.skipTest("requires multiprocessing.sharedctypes")
1935
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001936 @classmethod
1937 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001938 x.value *= 2
1939 y.value *= 2
1940 foo.x *= 2
1941 foo.y *= 2
1942 string.value *= 2
1943 for i in range(len(arr)):
1944 arr[i] *= 2
1945
1946 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001947 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001948 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001949 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001950 arr = self.Array('d', list(range(10)), lock=lock)
1951 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001952 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001953
1954 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001955 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001956 p.start()
1957 p.join()
1958
1959 self.assertEqual(x.value, 14)
1960 self.assertAlmostEqual(y.value, 2.0/3.0)
1961 self.assertEqual(foo.x, 6)
1962 self.assertAlmostEqual(foo.y, 4.0)
1963 for i in range(10):
1964 self.assertAlmostEqual(arr[i], i*2)
1965 self.assertEqual(string.value, latin('hellohello'))
1966
1967 def test_synchronize(self):
1968 self.test_sharedctypes(lock=True)
1969
1970 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001971 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001972 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001973 foo.x = 0
1974 foo.y = 0
1975 self.assertEqual(bar.x, 2)
1976 self.assertAlmostEqual(bar.y, 5.0)
1977
1978#
1979#
1980#
1981
1982class _TestFinalize(BaseTestCase):
1983
1984 ALLOWED_TYPES = ('processes',)
1985
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001986 @classmethod
1987 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001988 class Foo(object):
1989 pass
1990
1991 a = Foo()
1992 util.Finalize(a, conn.send, args=('a',))
1993 del a # triggers callback for a
1994
1995 b = Foo()
1996 close_b = util.Finalize(b, conn.send, args=('b',))
1997 close_b() # triggers callback for b
1998 close_b() # does nothing because callback has already been called
1999 del b # does nothing because callback has already been called
2000
2001 c = Foo()
2002 util.Finalize(c, conn.send, args=('c',))
2003
2004 d10 = Foo()
2005 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2006
2007 d01 = Foo()
2008 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2009 d02 = Foo()
2010 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2011 d03 = Foo()
2012 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2013
2014 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2015
2016 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2017
Ezio Melotti13925002011-03-16 11:05:33 +02002018 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002019 # garbage collecting locals
2020 util._exit_function()
2021 conn.close()
2022 os._exit(0)
2023
2024 def test_finalize(self):
2025 conn, child_conn = self.Pipe()
2026
2027 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002028 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002029 p.start()
2030 p.join()
2031
2032 result = [obj for obj in iter(conn.recv, 'STOP')]
2033 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2034
2035#
2036# Test that from ... import * works for each module
2037#
2038
2039class _TestImportStar(BaseTestCase):
2040
2041 ALLOWED_TYPES = ('processes',)
2042
2043 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002044 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002045 'multiprocessing', 'multiprocessing.connection',
2046 'multiprocessing.heap', 'multiprocessing.managers',
2047 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002048 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002049 ]
2050
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002051 if HAS_REDUCTION:
2052 modules.append('multiprocessing.reduction')
2053
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002054 if c_int is not None:
2055 # This module requires _ctypes
2056 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002057
2058 for name in modules:
2059 __import__(name)
2060 mod = sys.modules[name]
2061
2062 for attr in getattr(mod, '__all__', ()):
2063 self.assertTrue(
2064 hasattr(mod, attr),
2065 '%r does not have attribute %r' % (mod, attr)
2066 )
2067
2068#
2069# Quick test that logging works -- does not test logging output
2070#
2071
2072class _TestLogging(BaseTestCase):
2073
2074 ALLOWED_TYPES = ('processes',)
2075
2076 def test_enable_logging(self):
2077 logger = multiprocessing.get_logger()
2078 logger.setLevel(util.SUBWARNING)
2079 self.assertTrue(logger is not None)
2080 logger.debug('this will not be printed')
2081 logger.info('nor will this')
2082 logger.setLevel(LOG_LEVEL)
2083
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002084 @classmethod
2085 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002086 logger = multiprocessing.get_logger()
2087 conn.send(logger.getEffectiveLevel())
2088
2089 def test_level(self):
2090 LEVEL1 = 32
2091 LEVEL2 = 37
2092
2093 logger = multiprocessing.get_logger()
2094 root_logger = logging.getLogger()
2095 root_level = root_logger.level
2096
2097 reader, writer = multiprocessing.Pipe(duplex=False)
2098
2099 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002100 p = self.Process(target=self._test_level, args=(writer,))
2101 p.daemon = True
2102 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002103 self.assertEqual(LEVEL1, reader.recv())
2104
2105 logger.setLevel(logging.NOTSET)
2106 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002107 p = self.Process(target=self._test_level, args=(writer,))
2108 p.daemon = True
2109 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002110 self.assertEqual(LEVEL2, reader.recv())
2111
2112 root_logger.setLevel(root_level)
2113 logger.setLevel(level=LOG_LEVEL)
2114
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002115
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002116# class _TestLoggingProcessName(BaseTestCase):
2117#
2118# def handle(self, record):
2119# assert record.processName == multiprocessing.current_process().name
2120# self.__handled = True
2121#
2122# def test_logging(self):
2123# handler = logging.Handler()
2124# handler.handle = self.handle
2125# self.__handled = False
2126# # Bypass getLogger() and side-effects
2127# logger = logging.getLoggerClass()(
2128# 'multiprocessing.test.TestLoggingProcessName')
2129# logger.addHandler(handler)
2130# logger.propagate = False
2131#
2132# logger.warn('foo')
2133# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002134
Benjamin Petersone711caf2008-06-11 16:44:04 +00002135#
Jesse Noller6214edd2009-01-19 16:23:53 +00002136# Test to verify handle verification, see issue 3321
2137#
2138
2139class TestInvalidHandle(unittest.TestCase):
2140
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002141 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002142 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002143 conn = multiprocessing.connection.Connection(44977608)
2144 try:
2145 self.assertRaises((ValueError, IOError), conn.poll)
2146 finally:
2147 # Hack private attribute _handle to avoid printing an error
2148 # in conn.__del__
2149 conn._handle = None
2150 self.assertRaises((ValueError, IOError),
2151 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002152
Jesse Noller6214edd2009-01-19 16:23:53 +00002153#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002154# Functions used to create test cases from the base ones in this module
2155#
2156
2157def get_attributes(Source, names):
2158 d = {}
2159 for name in names:
2160 obj = getattr(Source, name)
2161 if type(obj) == type(get_attributes):
2162 obj = staticmethod(obj)
2163 d[name] = obj
2164 return d
2165
2166def create_test_cases(Mixin, type):
2167 result = {}
2168 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002169 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002170
2171 for name in list(glob.keys()):
2172 if name.startswith('_Test'):
2173 base = glob[name]
2174 if type in base.ALLOWED_TYPES:
2175 newname = 'With' + Type + name[1:]
2176 class Temp(base, unittest.TestCase, Mixin):
2177 pass
2178 result[newname] = Temp
2179 Temp.__name__ = newname
2180 Temp.__module__ = Mixin.__module__
2181 return result
2182
2183#
2184# Create test cases
2185#
2186
2187class ProcessesMixin(object):
2188 TYPE = 'processes'
2189 Process = multiprocessing.Process
2190 locals().update(get_attributes(multiprocessing, (
2191 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2192 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2193 'RawArray', 'current_process', 'active_children', 'Pipe',
2194 'connection', 'JoinableQueue'
2195 )))
2196
2197testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2198globals().update(testcases_processes)
2199
2200
2201class ManagerMixin(object):
2202 TYPE = 'manager'
2203 Process = multiprocessing.Process
2204 manager = object.__new__(multiprocessing.managers.SyncManager)
2205 locals().update(get_attributes(manager, (
2206 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2207 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2208 'Namespace', 'JoinableQueue'
2209 )))
2210
2211testcases_manager = create_test_cases(ManagerMixin, type='manager')
2212globals().update(testcases_manager)
2213
2214
2215class ThreadsMixin(object):
2216 TYPE = 'threads'
2217 Process = multiprocessing.dummy.Process
2218 locals().update(get_attributes(multiprocessing.dummy, (
2219 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2220 'Condition', 'Event', 'Value', 'Array', 'current_process',
2221 'active_children', 'Pipe', 'connection', 'dict', 'list',
2222 'Namespace', 'JoinableQueue'
2223 )))
2224
2225testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2226globals().update(testcases_threads)
2227
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002228class OtherTest(unittest.TestCase):
2229 # TODO: add more tests for deliver/answer challenge.
2230 def test_deliver_challenge_auth_failure(self):
2231 class _FakeConnection(object):
2232 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002233 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002234 def send_bytes(self, data):
2235 pass
2236 self.assertRaises(multiprocessing.AuthenticationError,
2237 multiprocessing.connection.deliver_challenge,
2238 _FakeConnection(), b'abc')
2239
2240 def test_answer_challenge_auth_failure(self):
2241 class _FakeConnection(object):
2242 def __init__(self):
2243 self.count = 0
2244 def recv_bytes(self, size):
2245 self.count += 1
2246 if self.count == 1:
2247 return multiprocessing.connection.CHALLENGE
2248 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002249 return b'something bogus'
2250 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002251 def send_bytes(self, data):
2252 pass
2253 self.assertRaises(multiprocessing.AuthenticationError,
2254 multiprocessing.connection.answer_challenge,
2255 _FakeConnection(), b'abc')
2256
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002257#
2258# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2259#
2260
2261def initializer(ns):
2262 ns.test += 1
2263
2264class TestInitializers(unittest.TestCase):
2265 def setUp(self):
2266 self.mgr = multiprocessing.Manager()
2267 self.ns = self.mgr.Namespace()
2268 self.ns.test = 0
2269
2270 def tearDown(self):
2271 self.mgr.shutdown()
2272
2273 def test_manager_initializer(self):
2274 m = multiprocessing.managers.SyncManager()
2275 self.assertRaises(TypeError, m.start, 1)
2276 m.start(initializer, (self.ns,))
2277 self.assertEqual(self.ns.test, 1)
2278 m.shutdown()
2279
2280 def test_pool_initializer(self):
2281 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2282 p = multiprocessing.Pool(1, initializer, (self.ns,))
2283 p.close()
2284 p.join()
2285 self.assertEqual(self.ns.test, 1)
2286
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002287#
2288# Issue 5155, 5313, 5331: Test process in processes
2289# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2290#
2291
2292def _ThisSubProcess(q):
2293 try:
2294 item = q.get(block=False)
2295 except pyqueue.Empty:
2296 pass
2297
2298def _TestProcess(q):
2299 queue = multiprocessing.Queue()
2300 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002301 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002302 subProc.start()
2303 subProc.join()
2304
2305def _afunc(x):
2306 return x*x
2307
2308def pool_in_process():
2309 pool = multiprocessing.Pool(processes=4)
2310 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2311
2312class _file_like(object):
2313 def __init__(self, delegate):
2314 self._delegate = delegate
2315 self._pid = None
2316
2317 @property
2318 def cache(self):
2319 pid = os.getpid()
2320 # There are no race conditions since fork keeps only the running thread
2321 if pid != self._pid:
2322 self._pid = pid
2323 self._cache = []
2324 return self._cache
2325
2326 def write(self, data):
2327 self.cache.append(data)
2328
2329 def flush(self):
2330 self._delegate.write(''.join(self.cache))
2331 self._cache = []
2332
2333class TestStdinBadfiledescriptor(unittest.TestCase):
2334
2335 def test_queue_in_process(self):
2336 queue = multiprocessing.Queue()
2337 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2338 proc.start()
2339 proc.join()
2340
2341 def test_pool_in_process(self):
2342 p = multiprocessing.Process(target=pool_in_process)
2343 p.start()
2344 p.join()
2345
2346 def test_flushing(self):
2347 sio = io.StringIO()
2348 flike = _file_like(sio)
2349 flike.write('foo')
2350 proc = multiprocessing.Process(target=lambda: flike.flush())
2351 flike.flush()
2352 assert sio.getvalue() == 'foo'
2353
2354testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2355 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002356
Benjamin Petersone711caf2008-06-11 16:44:04 +00002357#
2358#
2359#
2360
2361def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002362 if sys.platform.startswith("linux"):
2363 try:
2364 lock = multiprocessing.RLock()
2365 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002366 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002367
Charles-François Natali221ef672011-11-22 18:55:22 +01002368 check_enough_semaphores()
2369
Benjamin Petersone711caf2008-06-11 16:44:04 +00002370 if run is None:
2371 from test.support import run_unittest as run
2372
2373 util.get_temp_dir() # creates temp directory for use by all processes
2374
2375 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2376
Benjamin Peterson41181742008-07-02 20:22:54 +00002377 ProcessesMixin.pool = multiprocessing.Pool(4)
2378 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2379 ManagerMixin.manager.__init__()
2380 ManagerMixin.manager.start()
2381 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002382
2383 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002384 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2385 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002386 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2387 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002388 )
2389
2390 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2391 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2392 run(suite)
2393
Benjamin Peterson41181742008-07-02 20:22:54 +00002394 ThreadsMixin.pool.terminate()
2395 ProcessesMixin.pool.terminate()
2396 ManagerMixin.pool.terminate()
2397 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002398
Benjamin Peterson41181742008-07-02 20:22:54 +00002399 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002400
2401def main():
2402 test_main(unittest.TextTestRunner(verbosity=2).run)
2403
2404if __name__ == '__main__':
2405 main()