blob: 05fba7fb68cb87029c290919d443dc22893f5a24 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010021import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalie51c8da2011-09-21 18:48:21 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
86
Antoine Pitroubcb39d42011-08-23 19:46:22 +020087try:
88 MAXFD = os.sysconf("SC_OPEN_MAX")
89except:
90 MAXFD = 256
91
Benjamin Petersone711caf2008-06-11 16:44:04 +000092#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000093# Some tests require ctypes
94#
95
96try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000097 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000098except ImportError:
99 Structure = object
100 c_int = c_double = None
101
Charles-François Natali3be00952011-11-22 18:36:39 +0100102
103def check_enough_semaphores():
104 """Check that the system supports enough semaphores to run the test."""
105 # minimum number of semaphores available according to POSIX
106 nsems_min = 256
107 try:
108 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
109 except (AttributeError, ValueError):
110 # sysconf not available or setting not available
111 return
112 if nsems == -1 or nsems >= nsems_min:
113 return
114 raise unittest.SkipTest("The OS doesn't support enough semaphores "
115 "to run the test (required: %d)." % nsems_min)
116
117
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000118#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000119# Creates a wrapper for a function which records the time it takes to finish
120#
121
122class TimingWrapper(object):
123
124 def __init__(self, func):
125 self.func = func
126 self.elapsed = None
127
128 def __call__(self, *args, **kwds):
129 t = time.time()
130 try:
131 return self.func(*args, **kwds)
132 finally:
133 self.elapsed = time.time() - t
134
135#
136# Base class for test cases
137#
138
139class BaseTestCase(object):
140
141 ALLOWED_TYPES = ('processes', 'manager', 'threads')
142
143 def assertTimingAlmostEqual(self, a, b):
144 if CHECK_TIMINGS:
145 self.assertAlmostEqual(a, b, 1)
146
147 def assertReturnsIfImplemented(self, value, func, *args):
148 try:
149 res = func(*args)
150 except NotImplementedError:
151 pass
152 else:
153 return self.assertEqual(value, res)
154
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000155 # For the sanity of Windows users, rather than crashing or freezing in
156 # multiple ways.
157 def __reduce__(self, *args):
158 raise NotImplementedError("shouldn't try to pickle a test case")
159
160 __reduce_ex__ = __reduce__
161
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162#
163# Return the value of a semaphore
164#
165
166def get_value(self):
167 try:
168 return self.get_value()
169 except AttributeError:
170 try:
171 return self._Semaphore__value
172 except AttributeError:
173 try:
174 return self._value
175 except AttributeError:
176 raise NotImplementedError
177
178#
179# Testcases
180#
181
182class _TestProcess(BaseTestCase):
183
184 ALLOWED_TYPES = ('processes', 'threads')
185
186 def test_current(self):
187 if self.TYPE == 'threads':
188 return
189
190 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192
193 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000195 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 self.assertEqual(current.ident, os.getpid())
198 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000200 @classmethod
201 def _test(cls, q, *args, **kwds):
202 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 q.put(args)
204 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000206 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208 q.put(current.pid)
209
210 def test_process(self):
211 q = self.Queue(1)
212 e = self.Event()
213 args = (q, 1, 2)
214 kwargs = {'hello':23, 'bye':2.54}
215 name = 'SomeProcess'
216 p = self.Process(
217 target=self._test, args=args, kwargs=kwargs, name=name
218 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000219 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220 current = self.current_process()
221
222 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000223 self.assertEqual(p.authkey, current.authkey)
224 self.assertEqual(p.is_alive(), False)
225 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000226 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000228 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000229
230 p.start()
231
Ezio Melottib3aedd42010-11-20 19:04:17 +0000232 self.assertEqual(p.exitcode, None)
233 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000234 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235
Ezio Melottib3aedd42010-11-20 19:04:17 +0000236 self.assertEqual(q.get(), args[1:])
237 self.assertEqual(q.get(), kwargs)
238 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000240 self.assertEqual(q.get(), current.authkey)
241 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242
243 p.join()
244
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.exitcode, 0)
246 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000247 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000249 @classmethod
250 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251 time.sleep(1000)
252
253 def test_terminate(self):
254 if self.TYPE == 'threads':
255 return
256
257 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000258 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259 p.start()
260
261 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000262 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000263 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.terminate()
266
267 join = TimingWrapper(p.join)
268 self.assertEqual(join(), None)
269 self.assertTimingAlmostEqual(join.elapsed, 0.0)
270
271 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000272 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273
274 p.join()
275
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000276 # XXX sometimes get p.exitcode == 0 on Windows ...
277 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000278
279 def test_cpu_count(self):
280 try:
281 cpus = multiprocessing.cpu_count()
282 except NotImplementedError:
283 cpus = 1
284 self.assertTrue(type(cpus) is int)
285 self.assertTrue(cpus >= 1)
286
287 def test_active_children(self):
288 self.assertEqual(type(self.active_children()), list)
289
290 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000291 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000292
Jesus Cea94f964f2011-09-09 20:26:57 +0200293 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000294 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000295 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296
297 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000298 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000300 @classmethod
301 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302 from multiprocessing import forking
303 wconn.send(id)
304 if len(id) < 2:
305 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000306 p = cls.Process(
307 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000308 )
309 p.start()
310 p.join()
311
312 def test_recursion(self):
313 rconn, wconn = self.Pipe(duplex=False)
314 self._test_recursion(wconn, [])
315
316 time.sleep(DELTA)
317 result = []
318 while rconn.poll():
319 result.append(rconn.recv())
320
321 expected = [
322 [],
323 [0],
324 [0, 0],
325 [0, 1],
326 [1],
327 [1, 0],
328 [1, 1]
329 ]
330 self.assertEqual(result, expected)
331
332#
333#
334#
335
336class _UpperCaser(multiprocessing.Process):
337
338 def __init__(self):
339 multiprocessing.Process.__init__(self)
340 self.child_conn, self.parent_conn = multiprocessing.Pipe()
341
342 def run(self):
343 self.parent_conn.close()
344 for s in iter(self.child_conn.recv, None):
345 self.child_conn.send(s.upper())
346 self.child_conn.close()
347
348 def submit(self, s):
349 assert type(s) is str
350 self.parent_conn.send(s)
351 return self.parent_conn.recv()
352
353 def stop(self):
354 self.parent_conn.send(None)
355 self.parent_conn.close()
356 self.child_conn.close()
357
358class _TestSubclassingProcess(BaseTestCase):
359
360 ALLOWED_TYPES = ('processes',)
361
362 def test_subclassing(self):
363 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200364 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000365 uppercaser.start()
366 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
367 self.assertEqual(uppercaser.submit('world'), 'WORLD')
368 uppercaser.stop()
369 uppercaser.join()
370
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100371 def test_stderr_flush(self):
372 # sys.stderr is flushed at process shutdown (issue #13812)
373 if self.TYPE == "threads":
374 return
375
376 testfn = test.support.TESTFN
377 self.addCleanup(test.support.unlink, testfn)
378 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
379 proc.start()
380 proc.join()
381 with open(testfn, 'r') as f:
382 err = f.read()
383 # The whole traceback was printed
384 self.assertIn("ZeroDivisionError", err)
385 self.assertIn("test_multiprocessing.py", err)
386 self.assertIn("1/0 # MARKER", err)
387
388 @classmethod
389 def _test_stderr_flush(cls, testfn):
390 sys.stderr = open(testfn, 'w')
391 1/0 # MARKER
392
393
Richard Oudkerk29471de2012-06-06 19:04:57 +0100394 @classmethod
395 def _test_sys_exit(cls, reason, testfn):
396 sys.stderr = open(testfn, 'w')
397 sys.exit(reason)
398
399 def test_sys_exit(self):
400 # See Issue 13854
401 if self.TYPE == 'threads':
402 return
403
404 testfn = test.support.TESTFN
405 self.addCleanup(test.support.unlink, testfn)
406
407 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
408 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
409 p.daemon = True
410 p.start()
411 p.join(5)
412 self.assertEqual(p.exitcode, code)
413
414 with open(testfn, 'r') as f:
415 self.assertEqual(f.read().rstrip(), str(reason))
416
417 for reason in (True, False, 8):
418 p = self.Process(target=sys.exit, args=(reason,))
419 p.daemon = True
420 p.start()
421 p.join(5)
422 self.assertEqual(p.exitcode, reason)
423
Benjamin Petersone711caf2008-06-11 16:44:04 +0000424#
425#
426#
427
428def queue_empty(q):
429 if hasattr(q, 'empty'):
430 return q.empty()
431 else:
432 return q.qsize() == 0
433
434def queue_full(q, maxsize):
435 if hasattr(q, 'full'):
436 return q.full()
437 else:
438 return q.qsize() == maxsize
439
440
441class _TestQueue(BaseTestCase):
442
443
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000444 @classmethod
445 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000446 child_can_start.wait()
447 for i in range(6):
448 queue.get()
449 parent_can_continue.set()
450
451 def test_put(self):
452 MAXSIZE = 6
453 queue = self.Queue(maxsize=MAXSIZE)
454 child_can_start = self.Event()
455 parent_can_continue = self.Event()
456
457 proc = self.Process(
458 target=self._test_put,
459 args=(queue, child_can_start, parent_can_continue)
460 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000461 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000462 proc.start()
463
464 self.assertEqual(queue_empty(queue), True)
465 self.assertEqual(queue_full(queue, MAXSIZE), False)
466
467 queue.put(1)
468 queue.put(2, True)
469 queue.put(3, True, None)
470 queue.put(4, False)
471 queue.put(5, False, None)
472 queue.put_nowait(6)
473
474 # the values may be in buffer but not yet in pipe so sleep a bit
475 time.sleep(DELTA)
476
477 self.assertEqual(queue_empty(queue), False)
478 self.assertEqual(queue_full(queue, MAXSIZE), True)
479
480 put = TimingWrapper(queue.put)
481 put_nowait = TimingWrapper(queue.put_nowait)
482
483 self.assertRaises(pyqueue.Full, put, 7, False)
484 self.assertTimingAlmostEqual(put.elapsed, 0)
485
486 self.assertRaises(pyqueue.Full, put, 7, False, None)
487 self.assertTimingAlmostEqual(put.elapsed, 0)
488
489 self.assertRaises(pyqueue.Full, put_nowait, 7)
490 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
491
492 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
493 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
494
495 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
496 self.assertTimingAlmostEqual(put.elapsed, 0)
497
498 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
499 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
500
501 child_can_start.set()
502 parent_can_continue.wait()
503
504 self.assertEqual(queue_empty(queue), True)
505 self.assertEqual(queue_full(queue, MAXSIZE), False)
506
507 proc.join()
508
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000509 @classmethod
510 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000511 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000512 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 queue.put(2)
514 queue.put(3)
515 queue.put(4)
516 queue.put(5)
517 parent_can_continue.set()
518
519 def test_get(self):
520 queue = self.Queue()
521 child_can_start = self.Event()
522 parent_can_continue = self.Event()
523
524 proc = self.Process(
525 target=self._test_get,
526 args=(queue, child_can_start, parent_can_continue)
527 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000528 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000529 proc.start()
530
531 self.assertEqual(queue_empty(queue), True)
532
533 child_can_start.set()
534 parent_can_continue.wait()
535
536 time.sleep(DELTA)
537 self.assertEqual(queue_empty(queue), False)
538
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000539 # Hangs unexpectedly, remove for now
540 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000541 self.assertEqual(queue.get(True, None), 2)
542 self.assertEqual(queue.get(True), 3)
543 self.assertEqual(queue.get(timeout=1), 4)
544 self.assertEqual(queue.get_nowait(), 5)
545
546 self.assertEqual(queue_empty(queue), True)
547
548 get = TimingWrapper(queue.get)
549 get_nowait = TimingWrapper(queue.get_nowait)
550
551 self.assertRaises(pyqueue.Empty, get, False)
552 self.assertTimingAlmostEqual(get.elapsed, 0)
553
554 self.assertRaises(pyqueue.Empty, get, False, None)
555 self.assertTimingAlmostEqual(get.elapsed, 0)
556
557 self.assertRaises(pyqueue.Empty, get_nowait)
558 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
559
560 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
561 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
562
563 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
564 self.assertTimingAlmostEqual(get.elapsed, 0)
565
566 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
567 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
568
569 proc.join()
570
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000571 @classmethod
572 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000573 for i in range(10, 20):
574 queue.put(i)
575 # note that at this point the items may only be buffered, so the
576 # process cannot shutdown until the feeder thread has finished
577 # pushing items onto the pipe.
578
579 def test_fork(self):
580 # Old versions of Queue would fail to create a new feeder
581 # thread for a forked process if the original process had its
582 # own feeder thread. This test checks that this no longer
583 # happens.
584
585 queue = self.Queue()
586
587 # put items on queue so that main process starts a feeder thread
588 for i in range(10):
589 queue.put(i)
590
591 # wait to make sure thread starts before we fork a new process
592 time.sleep(DELTA)
593
594 # fork process
595 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200596 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000597 p.start()
598
599 # check that all expected items are in the queue
600 for i in range(20):
601 self.assertEqual(queue.get(), i)
602 self.assertRaises(pyqueue.Empty, queue.get, False)
603
604 p.join()
605
606 def test_qsize(self):
607 q = self.Queue()
608 try:
609 self.assertEqual(q.qsize(), 0)
610 except NotImplementedError:
611 return
612 q.put(1)
613 self.assertEqual(q.qsize(), 1)
614 q.put(5)
615 self.assertEqual(q.qsize(), 2)
616 q.get()
617 self.assertEqual(q.qsize(), 1)
618 q.get()
619 self.assertEqual(q.qsize(), 0)
620
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000621 @classmethod
622 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000623 for obj in iter(q.get, None):
624 time.sleep(DELTA)
625 q.task_done()
626
627 def test_task_done(self):
628 queue = self.JoinableQueue()
629
630 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000631 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000632
633 workers = [self.Process(target=self._test_task_done, args=(queue,))
634 for i in range(4)]
635
636 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200637 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000638 p.start()
639
640 for i in range(10):
641 queue.put(i)
642
643 queue.join()
644
645 for p in workers:
646 queue.put(None)
647
648 for p in workers:
649 p.join()
650
651#
652#
653#
654
655class _TestLock(BaseTestCase):
656
657 def test_lock(self):
658 lock = self.Lock()
659 self.assertEqual(lock.acquire(), True)
660 self.assertEqual(lock.acquire(False), False)
661 self.assertEqual(lock.release(), None)
662 self.assertRaises((ValueError, threading.ThreadError), lock.release)
663
664 def test_rlock(self):
665 lock = self.RLock()
666 self.assertEqual(lock.acquire(), True)
667 self.assertEqual(lock.acquire(), True)
668 self.assertEqual(lock.acquire(), True)
669 self.assertEqual(lock.release(), None)
670 self.assertEqual(lock.release(), None)
671 self.assertEqual(lock.release(), None)
672 self.assertRaises((AssertionError, RuntimeError), lock.release)
673
Jesse Nollerf8d00852009-03-31 03:25:07 +0000674 def test_lock_context(self):
675 with self.Lock():
676 pass
677
Benjamin Petersone711caf2008-06-11 16:44:04 +0000678
679class _TestSemaphore(BaseTestCase):
680
681 def _test_semaphore(self, sem):
682 self.assertReturnsIfImplemented(2, get_value, sem)
683 self.assertEqual(sem.acquire(), True)
684 self.assertReturnsIfImplemented(1, get_value, sem)
685 self.assertEqual(sem.acquire(), True)
686 self.assertReturnsIfImplemented(0, get_value, sem)
687 self.assertEqual(sem.acquire(False), False)
688 self.assertReturnsIfImplemented(0, get_value, sem)
689 self.assertEqual(sem.release(), None)
690 self.assertReturnsIfImplemented(1, get_value, sem)
691 self.assertEqual(sem.release(), None)
692 self.assertReturnsIfImplemented(2, get_value, sem)
693
694 def test_semaphore(self):
695 sem = self.Semaphore(2)
696 self._test_semaphore(sem)
697 self.assertEqual(sem.release(), None)
698 self.assertReturnsIfImplemented(3, get_value, sem)
699 self.assertEqual(sem.release(), None)
700 self.assertReturnsIfImplemented(4, get_value, sem)
701
702 def test_bounded_semaphore(self):
703 sem = self.BoundedSemaphore(2)
704 self._test_semaphore(sem)
705 # Currently fails on OS/X
706 #if HAVE_GETVALUE:
707 # self.assertRaises(ValueError, sem.release)
708 # self.assertReturnsIfImplemented(2, get_value, sem)
709
710 def test_timeout(self):
711 if self.TYPE != 'processes':
712 return
713
714 sem = self.Semaphore(0)
715 acquire = TimingWrapper(sem.acquire)
716
717 self.assertEqual(acquire(False), False)
718 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
719
720 self.assertEqual(acquire(False, None), False)
721 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
722
723 self.assertEqual(acquire(False, TIMEOUT1), False)
724 self.assertTimingAlmostEqual(acquire.elapsed, 0)
725
726 self.assertEqual(acquire(True, TIMEOUT2), False)
727 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
728
729 self.assertEqual(acquire(timeout=TIMEOUT3), False)
730 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
731
732
733class _TestCondition(BaseTestCase):
734
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000735 @classmethod
736 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000737 cond.acquire()
738 sleeping.release()
739 cond.wait(timeout)
740 woken.release()
741 cond.release()
742
743 def check_invariant(self, cond):
744 # this is only supposed to succeed when there are no sleepers
745 if self.TYPE == 'processes':
746 try:
747 sleepers = (cond._sleeping_count.get_value() -
748 cond._woken_count.get_value())
749 self.assertEqual(sleepers, 0)
750 self.assertEqual(cond._wait_semaphore.get_value(), 0)
751 except NotImplementedError:
752 pass
753
754 def test_notify(self):
755 cond = self.Condition()
756 sleeping = self.Semaphore(0)
757 woken = self.Semaphore(0)
758
759 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000760 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000761 p.start()
762
763 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000764 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000765 p.start()
766
767 # wait for both children to start sleeping
768 sleeping.acquire()
769 sleeping.acquire()
770
771 # check no process/thread has woken up
772 time.sleep(DELTA)
773 self.assertReturnsIfImplemented(0, get_value, woken)
774
775 # wake up one process/thread
776 cond.acquire()
777 cond.notify()
778 cond.release()
779
780 # check one process/thread has woken up
781 time.sleep(DELTA)
782 self.assertReturnsIfImplemented(1, get_value, woken)
783
784 # wake up another
785 cond.acquire()
786 cond.notify()
787 cond.release()
788
789 # check other has woken up
790 time.sleep(DELTA)
791 self.assertReturnsIfImplemented(2, get_value, woken)
792
793 # check state is not mucked up
794 self.check_invariant(cond)
795 p.join()
796
797 def test_notify_all(self):
798 cond = self.Condition()
799 sleeping = self.Semaphore(0)
800 woken = self.Semaphore(0)
801
802 # start some threads/processes which will timeout
803 for i in range(3):
804 p = self.Process(target=self.f,
805 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000806 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 p.start()
808
809 t = threading.Thread(target=self.f,
810 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000811 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 t.start()
813
814 # wait for them all to sleep
815 for i in range(6):
816 sleeping.acquire()
817
818 # check they have all timed out
819 for i in range(6):
820 woken.acquire()
821 self.assertReturnsIfImplemented(0, get_value, woken)
822
823 # check state is not mucked up
824 self.check_invariant(cond)
825
826 # start some more threads/processes
827 for i in range(3):
828 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000829 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830 p.start()
831
832 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000833 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 t.start()
835
836 # wait for them to all sleep
837 for i in range(6):
838 sleeping.acquire()
839
840 # check no process/thread has woken up
841 time.sleep(DELTA)
842 self.assertReturnsIfImplemented(0, get_value, woken)
843
844 # wake them all up
845 cond.acquire()
846 cond.notify_all()
847 cond.release()
848
849 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200850 for i in range(10):
851 try:
852 if get_value(woken) == 6:
853 break
854 except NotImplementedError:
855 break
856 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000857 self.assertReturnsIfImplemented(6, get_value, woken)
858
859 # check state is not mucked up
860 self.check_invariant(cond)
861
862 def test_timeout(self):
863 cond = self.Condition()
864 wait = TimingWrapper(cond.wait)
865 cond.acquire()
866 res = wait(TIMEOUT1)
867 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000868 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
870
871
872class _TestEvent(BaseTestCase):
873
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000874 @classmethod
875 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 time.sleep(TIMEOUT2)
877 event.set()
878
879 def test_event(self):
880 event = self.Event()
881 wait = TimingWrapper(event.wait)
882
Ezio Melotti13925002011-03-16 11:05:33 +0200883 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000884 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000885 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000886
Benjamin Peterson965ce872009-04-05 21:24:58 +0000887 # Removed, threading.Event.wait() will return the value of the __flag
888 # instead of None. API Shear with the semaphore backed mp.Event
889 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000891 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
893
894 event.set()
895
896 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000897 self.assertEqual(event.is_set(), True)
898 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000899 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000900 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000901 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
902 # self.assertEqual(event.is_set(), True)
903
904 event.clear()
905
906 #self.assertEqual(event.is_set(), False)
907
Jesus Cea94f964f2011-09-09 20:26:57 +0200908 p = self.Process(target=self._test_event, args=(event,))
909 p.daemon = True
910 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000911 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000912
913#
914#
915#
916
917class _TestValue(BaseTestCase):
918
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000919 ALLOWED_TYPES = ('processes',)
920
Benjamin Petersone711caf2008-06-11 16:44:04 +0000921 codes_values = [
922 ('i', 4343, 24234),
923 ('d', 3.625, -4.25),
924 ('h', -232, 234),
925 ('c', latin('x'), latin('y'))
926 ]
927
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000928 def setUp(self):
929 if not HAS_SHAREDCTYPES:
930 self.skipTest("requires multiprocessing.sharedctypes")
931
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000932 @classmethod
933 def _test(cls, values):
934 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000935 sv.value = cv[2]
936
937
938 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000939 if raw:
940 values = [self.RawValue(code, value)
941 for code, value, _ in self.codes_values]
942 else:
943 values = [self.Value(code, value)
944 for code, value, _ in self.codes_values]
945
946 for sv, cv in zip(values, self.codes_values):
947 self.assertEqual(sv.value, cv[1])
948
949 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200950 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000951 proc.start()
952 proc.join()
953
954 for sv, cv in zip(values, self.codes_values):
955 self.assertEqual(sv.value, cv[2])
956
957 def test_rawvalue(self):
958 self.test_value(raw=True)
959
960 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961 val1 = self.Value('i', 5)
962 lock1 = val1.get_lock()
963 obj1 = val1.get_obj()
964
965 val2 = self.Value('i', 5, lock=None)
966 lock2 = val2.get_lock()
967 obj2 = val2.get_obj()
968
969 lock = self.Lock()
970 val3 = self.Value('i', 5, lock=lock)
971 lock3 = val3.get_lock()
972 obj3 = val3.get_obj()
973 self.assertEqual(lock, lock3)
974
Jesse Nollerb0516a62009-01-18 03:11:38 +0000975 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 self.assertFalse(hasattr(arr4, 'get_lock'))
977 self.assertFalse(hasattr(arr4, 'get_obj'))
978
Jesse Nollerb0516a62009-01-18 03:11:38 +0000979 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
980
981 arr5 = self.RawValue('i', 5)
982 self.assertFalse(hasattr(arr5, 'get_lock'))
983 self.assertFalse(hasattr(arr5, 'get_obj'))
984
Benjamin Petersone711caf2008-06-11 16:44:04 +0000985
986class _TestArray(BaseTestCase):
987
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000988 ALLOWED_TYPES = ('processes',)
989
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000990 @classmethod
991 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000992 for i in range(1, len(seq)):
993 seq[i] += seq[i-1]
994
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000995 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000997 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
998 if raw:
999 arr = self.RawArray('i', seq)
1000 else:
1001 arr = self.Array('i', seq)
1002
1003 self.assertEqual(len(arr), len(seq))
1004 self.assertEqual(arr[3], seq[3])
1005 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1006
1007 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1008
1009 self.assertEqual(list(arr[:]), seq)
1010
1011 self.f(seq)
1012
1013 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001014 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 p.start()
1016 p.join()
1017
1018 self.assertEqual(list(arr[:]), seq)
1019
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001020 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001021 def test_array_from_size(self):
1022 size = 10
1023 # Test for zeroing (see issue #11675).
1024 # The repetition below strengthens the test by increasing the chances
1025 # of previously allocated non-zero memory being used for the new array
1026 # on the 2nd and 3rd loops.
1027 for _ in range(3):
1028 arr = self.Array('i', size)
1029 self.assertEqual(len(arr), size)
1030 self.assertEqual(list(arr), [0] * size)
1031 arr[:] = range(10)
1032 self.assertEqual(list(arr), list(range(10)))
1033 del arr
1034
1035 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001036 def test_rawarray(self):
1037 self.test_array(raw=True)
1038
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001039 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001040 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041 arr1 = self.Array('i', list(range(10)))
1042 lock1 = arr1.get_lock()
1043 obj1 = arr1.get_obj()
1044
1045 arr2 = self.Array('i', list(range(10)), lock=None)
1046 lock2 = arr2.get_lock()
1047 obj2 = arr2.get_obj()
1048
1049 lock = self.Lock()
1050 arr3 = self.Array('i', list(range(10)), lock=lock)
1051 lock3 = arr3.get_lock()
1052 obj3 = arr3.get_obj()
1053 self.assertEqual(lock, lock3)
1054
Jesse Nollerb0516a62009-01-18 03:11:38 +00001055 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056 self.assertFalse(hasattr(arr4, 'get_lock'))
1057 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001058 self.assertRaises(AttributeError,
1059 self.Array, 'i', range(10), lock='notalock')
1060
1061 arr5 = self.RawArray('i', range(10))
1062 self.assertFalse(hasattr(arr5, 'get_lock'))
1063 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064
1065#
1066#
1067#
1068
1069class _TestContainers(BaseTestCase):
1070
1071 ALLOWED_TYPES = ('manager',)
1072
1073 def test_list(self):
1074 a = self.list(list(range(10)))
1075 self.assertEqual(a[:], list(range(10)))
1076
1077 b = self.list()
1078 self.assertEqual(b[:], [])
1079
1080 b.extend(list(range(5)))
1081 self.assertEqual(b[:], list(range(5)))
1082
1083 self.assertEqual(b[2], 2)
1084 self.assertEqual(b[2:10], [2,3,4])
1085
1086 b *= 2
1087 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1088
1089 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1090
1091 self.assertEqual(a[:], list(range(10)))
1092
1093 d = [a, b]
1094 e = self.list(d)
1095 self.assertEqual(
1096 e[:],
1097 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1098 )
1099
1100 f = self.list([a])
1101 a.append('hello')
1102 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1103
1104 def test_dict(self):
1105 d = self.dict()
1106 indices = list(range(65, 70))
1107 for i in indices:
1108 d[i] = chr(i)
1109 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1110 self.assertEqual(sorted(d.keys()), indices)
1111 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1112 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1113
1114 def test_namespace(self):
1115 n = self.Namespace()
1116 n.name = 'Bob'
1117 n.job = 'Builder'
1118 n._hidden = 'hidden'
1119 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1120 del n.job
1121 self.assertEqual(str(n), "Namespace(name='Bob')")
1122 self.assertTrue(hasattr(n, 'name'))
1123 self.assertTrue(not hasattr(n, 'job'))
1124
1125#
1126#
1127#
1128
1129def sqr(x, wait=0.0):
1130 time.sleep(wait)
1131 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001132
Benjamin Petersone711caf2008-06-11 16:44:04 +00001133class _TestPool(BaseTestCase):
1134
1135 def test_apply(self):
1136 papply = self.pool.apply
1137 self.assertEqual(papply(sqr, (5,)), sqr(5))
1138 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1139
1140 def test_map(self):
1141 pmap = self.pool.map
1142 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1143 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1144 list(map(sqr, list(range(100)))))
1145
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001146 def test_map_chunksize(self):
1147 try:
1148 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1149 except multiprocessing.TimeoutError:
1150 self.fail("pool.map_async with chunksize stalled on null list")
1151
Benjamin Petersone711caf2008-06-11 16:44:04 +00001152 def test_async(self):
1153 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1154 get = TimingWrapper(res.get)
1155 self.assertEqual(get(), 49)
1156 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1157
1158 def test_async_timeout(self):
1159 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1160 get = TimingWrapper(res.get)
1161 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1162 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1163
1164 def test_imap(self):
1165 it = self.pool.imap(sqr, list(range(10)))
1166 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1167
1168 it = self.pool.imap(sqr, list(range(10)))
1169 for i in range(10):
1170 self.assertEqual(next(it), i*i)
1171 self.assertRaises(StopIteration, it.__next__)
1172
1173 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1174 for i in range(1000):
1175 self.assertEqual(next(it), i*i)
1176 self.assertRaises(StopIteration, it.__next__)
1177
1178 def test_imap_unordered(self):
1179 it = self.pool.imap_unordered(sqr, list(range(1000)))
1180 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1181
1182 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1183 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1184
1185 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001186 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1187 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1188
Benjamin Petersone711caf2008-06-11 16:44:04 +00001189 p = multiprocessing.Pool(3)
1190 self.assertEqual(3, len(p._pool))
1191 p.close()
1192 p.join()
1193
1194 def test_terminate(self):
1195 if self.TYPE == 'manager':
1196 # On Unix a forked process increfs each shared object to
1197 # which its parent process held a reference. If the
1198 # forked process gets terminated then there is likely to
1199 # be a reference leak. So to prevent
1200 # _TestZZZNumberOfObjects from failing we skip this test
1201 # when using a manager.
1202 return
1203
1204 result = self.pool.map_async(
1205 time.sleep, [0.1 for i in range(10000)], chunksize=1
1206 )
1207 self.pool.terminate()
1208 join = TimingWrapper(self.pool.join)
1209 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001210 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001211
Richard Oudkerke41682b2012-06-06 19:04:57 +01001212 def test_empty_iterable(self):
1213 # See Issue 12157
1214 p = self.Pool(1)
1215
1216 self.assertEqual(p.map(sqr, []), [])
1217 self.assertEqual(list(p.imap(sqr, [])), [])
1218 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1219 self.assertEqual(p.map_async(sqr, []).get(), [])
1220
1221 p.close()
1222 p.join()
1223
Ask Solem2afcbf22010-11-09 20:55:52 +00001224def raising():
1225 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001226
Ask Solem2afcbf22010-11-09 20:55:52 +00001227def unpickleable_result():
1228 return lambda: 42
1229
1230class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001231 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001232
1233 def test_async_error_callback(self):
1234 p = multiprocessing.Pool(2)
1235
1236 scratchpad = [None]
1237 def errback(exc):
1238 scratchpad[0] = exc
1239
1240 res = p.apply_async(raising, error_callback=errback)
1241 self.assertRaises(KeyError, res.get)
1242 self.assertTrue(scratchpad[0])
1243 self.assertIsInstance(scratchpad[0], KeyError)
1244
1245 p.close()
1246 p.join()
1247
1248 def test_unpickleable_result(self):
1249 from multiprocessing.pool import MaybeEncodingError
1250 p = multiprocessing.Pool(2)
1251
1252 # Make sure we don't lose pool processes because of encoding errors.
1253 for iteration in range(20):
1254
1255 scratchpad = [None]
1256 def errback(exc):
1257 scratchpad[0] = exc
1258
1259 res = p.apply_async(unpickleable_result, error_callback=errback)
1260 self.assertRaises(MaybeEncodingError, res.get)
1261 wrapped = scratchpad[0]
1262 self.assertTrue(wrapped)
1263 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1264 self.assertIsNotNone(wrapped.exc)
1265 self.assertIsNotNone(wrapped.value)
1266
1267 p.close()
1268 p.join()
1269
1270class _TestPoolWorkerLifetime(BaseTestCase):
1271 ALLOWED_TYPES = ('processes', )
1272
Jesse Noller1f0b6582010-01-27 03:36:01 +00001273 def test_pool_worker_lifetime(self):
1274 p = multiprocessing.Pool(3, maxtasksperchild=10)
1275 self.assertEqual(3, len(p._pool))
1276 origworkerpids = [w.pid for w in p._pool]
1277 # Run many tasks so each worker gets replaced (hopefully)
1278 results = []
1279 for i in range(100):
1280 results.append(p.apply_async(sqr, (i, )))
1281 # Fetch the results and verify we got the right answers,
1282 # also ensuring all the tasks have completed.
1283 for (j, res) in enumerate(results):
1284 self.assertEqual(res.get(), sqr(j))
1285 # Refill the pool
1286 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001287 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001288 # (countdown * DELTA = 5 seconds max startup process time)
1289 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001290 while countdown and not all(w.is_alive() for w in p._pool):
1291 countdown -= 1
1292 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001293 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001294 # All pids should be assigned. See issue #7805.
1295 self.assertNotIn(None, origworkerpids)
1296 self.assertNotIn(None, finalworkerpids)
1297 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001298 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1299 p.close()
1300 p.join()
1301
Charles-François Natalif8859e12011-10-24 18:45:29 +02001302 def test_pool_worker_lifetime_early_close(self):
1303 # Issue #10332: closing a pool whose workers have limited lifetimes
1304 # before all the tasks completed would make join() hang.
1305 p = multiprocessing.Pool(3, maxtasksperchild=1)
1306 results = []
1307 for i in range(6):
1308 results.append(p.apply_async(sqr, (i, 0.3)))
1309 p.close()
1310 p.join()
1311 # check the results
1312 for (j, res) in enumerate(results):
1313 self.assertEqual(res.get(), sqr(j))
1314
1315
Benjamin Petersone711caf2008-06-11 16:44:04 +00001316#
1317# Test that manager has expected number of shared objects left
1318#
1319
1320class _TestZZZNumberOfObjects(BaseTestCase):
1321 # Because test cases are sorted alphabetically, this one will get
1322 # run after all the other tests for the manager. It tests that
1323 # there have been no "reference leaks" for the manager's shared
1324 # objects. Note the comment in _TestPool.test_terminate().
1325 ALLOWED_TYPES = ('manager',)
1326
1327 def test_number_of_objects(self):
1328 EXPECTED_NUMBER = 1 # the pool object is still alive
1329 multiprocessing.active_children() # discard dead process objs
1330 gc.collect() # do garbage collection
1331 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001332 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001333 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001334 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001335 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001336
1337 self.assertEqual(refs, EXPECTED_NUMBER)
1338
1339#
1340# Test of creating a customized manager class
1341#
1342
1343from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1344
1345class FooBar(object):
1346 def f(self):
1347 return 'f()'
1348 def g(self):
1349 raise ValueError
1350 def _h(self):
1351 return '_h()'
1352
1353def baz():
1354 for i in range(10):
1355 yield i*i
1356
1357class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001358 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001359 def __iter__(self):
1360 return self
1361 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001362 return self._callmethod('__next__')
1363
1364class MyManager(BaseManager):
1365 pass
1366
1367MyManager.register('Foo', callable=FooBar)
1368MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1369MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1370
1371
1372class _TestMyManager(BaseTestCase):
1373
1374 ALLOWED_TYPES = ('manager',)
1375
1376 def test_mymanager(self):
1377 manager = MyManager()
1378 manager.start()
1379
1380 foo = manager.Foo()
1381 bar = manager.Bar()
1382 baz = manager.baz()
1383
1384 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1385 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1386
1387 self.assertEqual(foo_methods, ['f', 'g'])
1388 self.assertEqual(bar_methods, ['f', '_h'])
1389
1390 self.assertEqual(foo.f(), 'f()')
1391 self.assertRaises(ValueError, foo.g)
1392 self.assertEqual(foo._callmethod('f'), 'f()')
1393 self.assertRaises(RemoteError, foo._callmethod, '_h')
1394
1395 self.assertEqual(bar.f(), 'f()')
1396 self.assertEqual(bar._h(), '_h()')
1397 self.assertEqual(bar._callmethod('f'), 'f()')
1398 self.assertEqual(bar._callmethod('_h'), '_h()')
1399
1400 self.assertEqual(list(baz), [i*i for i in range(10)])
1401
1402 manager.shutdown()
1403
1404#
1405# Test of connecting to a remote server and using xmlrpclib for serialization
1406#
1407
1408_queue = pyqueue.Queue()
1409def get_queue():
1410 return _queue
1411
1412class QueueManager(BaseManager):
1413 '''manager class used by server process'''
1414QueueManager.register('get_queue', callable=get_queue)
1415
1416class QueueManager2(BaseManager):
1417 '''manager class which specifies the same interface as QueueManager'''
1418QueueManager2.register('get_queue')
1419
1420
1421SERIALIZER = 'xmlrpclib'
1422
1423class _TestRemoteManager(BaseTestCase):
1424
1425 ALLOWED_TYPES = ('manager',)
1426
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001427 @classmethod
1428 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001429 manager = QueueManager2(
1430 address=address, authkey=authkey, serializer=SERIALIZER
1431 )
1432 manager.connect()
1433 queue = manager.get_queue()
1434 queue.put(('hello world', None, True, 2.25))
1435
1436 def test_remote(self):
1437 authkey = os.urandom(32)
1438
1439 manager = QueueManager(
1440 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1441 )
1442 manager.start()
1443
1444 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001445 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001446 p.start()
1447
1448 manager2 = QueueManager2(
1449 address=manager.address, authkey=authkey, serializer=SERIALIZER
1450 )
1451 manager2.connect()
1452 queue = manager2.get_queue()
1453
1454 # Note that xmlrpclib will deserialize object as a list not a tuple
1455 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1456
1457 # Because we are using xmlrpclib for serialization instead of
1458 # pickle this will cause a serialization error.
1459 self.assertRaises(Exception, queue.put, time.sleep)
1460
1461 # Make queue finalizer run before the server is stopped
1462 del queue
1463 manager.shutdown()
1464
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001465class _TestManagerRestart(BaseTestCase):
1466
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001467 @classmethod
1468 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001469 manager = QueueManager(
1470 address=address, authkey=authkey, serializer=SERIALIZER)
1471 manager.connect()
1472 queue = manager.get_queue()
1473 queue.put('hello world')
1474
1475 def test_rapid_restart(self):
1476 authkey = os.urandom(32)
1477 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001478 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001479 srvr = manager.get_server()
1480 addr = srvr.address
1481 # Close the connection.Listener socket which gets opened as a part
1482 # of manager.get_server(). It's not needed for the test.
1483 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001484 manager.start()
1485
1486 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001487 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001488 p.start()
1489 queue = manager.get_queue()
1490 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001491 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001492 manager.shutdown()
1493 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001494 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001495 try:
1496 manager.start()
1497 except IOError as e:
1498 if e.errno != errno.EADDRINUSE:
1499 raise
1500 # Retry after some time, in case the old socket was lingering
1501 # (sporadic failure on buildbots)
1502 time.sleep(1.0)
1503 manager = QueueManager(
1504 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001505 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001506
Benjamin Petersone711caf2008-06-11 16:44:04 +00001507#
1508#
1509#
1510
1511SENTINEL = latin('')
1512
1513class _TestConnection(BaseTestCase):
1514
1515 ALLOWED_TYPES = ('processes', 'threads')
1516
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001517 @classmethod
1518 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 for msg in iter(conn.recv_bytes, SENTINEL):
1520 conn.send_bytes(msg)
1521 conn.close()
1522
1523 def test_connection(self):
1524 conn, child_conn = self.Pipe()
1525
1526 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001527 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001528 p.start()
1529
1530 seq = [1, 2.25, None]
1531 msg = latin('hello world')
1532 longmsg = msg * 10
1533 arr = array.array('i', list(range(4)))
1534
1535 if self.TYPE == 'processes':
1536 self.assertEqual(type(conn.fileno()), int)
1537
1538 self.assertEqual(conn.send(seq), None)
1539 self.assertEqual(conn.recv(), seq)
1540
1541 self.assertEqual(conn.send_bytes(msg), None)
1542 self.assertEqual(conn.recv_bytes(), msg)
1543
1544 if self.TYPE == 'processes':
1545 buffer = array.array('i', [0]*10)
1546 expected = list(arr) + [0] * (10 - len(arr))
1547 self.assertEqual(conn.send_bytes(arr), None)
1548 self.assertEqual(conn.recv_bytes_into(buffer),
1549 len(arr) * buffer.itemsize)
1550 self.assertEqual(list(buffer), expected)
1551
1552 buffer = array.array('i', [0]*10)
1553 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1554 self.assertEqual(conn.send_bytes(arr), None)
1555 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1556 len(arr) * buffer.itemsize)
1557 self.assertEqual(list(buffer), expected)
1558
1559 buffer = bytearray(latin(' ' * 40))
1560 self.assertEqual(conn.send_bytes(longmsg), None)
1561 try:
1562 res = conn.recv_bytes_into(buffer)
1563 except multiprocessing.BufferTooShort as e:
1564 self.assertEqual(e.args, (longmsg,))
1565 else:
1566 self.fail('expected BufferTooShort, got %s' % res)
1567
1568 poll = TimingWrapper(conn.poll)
1569
1570 self.assertEqual(poll(), False)
1571 self.assertTimingAlmostEqual(poll.elapsed, 0)
1572
1573 self.assertEqual(poll(TIMEOUT1), False)
1574 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1575
1576 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01001577 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001578
1579 self.assertEqual(poll(TIMEOUT1), True)
1580 self.assertTimingAlmostEqual(poll.elapsed, 0)
1581
1582 self.assertEqual(conn.recv(), None)
1583
1584 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1585 conn.send_bytes(really_big_msg)
1586 self.assertEqual(conn.recv_bytes(), really_big_msg)
1587
1588 conn.send_bytes(SENTINEL) # tell child to quit
1589 child_conn.close()
1590
1591 if self.TYPE == 'processes':
1592 self.assertEqual(conn.readable, True)
1593 self.assertEqual(conn.writable, True)
1594 self.assertRaises(EOFError, conn.recv)
1595 self.assertRaises(EOFError, conn.recv_bytes)
1596
1597 p.join()
1598
1599 def test_duplex_false(self):
1600 reader, writer = self.Pipe(duplex=False)
1601 self.assertEqual(writer.send(1), None)
1602 self.assertEqual(reader.recv(), 1)
1603 if self.TYPE == 'processes':
1604 self.assertEqual(reader.readable, True)
1605 self.assertEqual(reader.writable, False)
1606 self.assertEqual(writer.readable, False)
1607 self.assertEqual(writer.writable, True)
1608 self.assertRaises(IOError, reader.send, 2)
1609 self.assertRaises(IOError, writer.recv)
1610 self.assertRaises(IOError, writer.poll)
1611
1612 def test_spawn_close(self):
1613 # We test that a pipe connection can be closed by parent
1614 # process immediately after child is spawned. On Windows this
1615 # would have sometimes failed on old versions because
1616 # child_conn would be closed before the child got a chance to
1617 # duplicate it.
1618 conn, child_conn = self.Pipe()
1619
1620 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001621 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001622 p.start()
1623 child_conn.close() # this might complete before child initializes
1624
1625 msg = latin('hello')
1626 conn.send_bytes(msg)
1627 self.assertEqual(conn.recv_bytes(), msg)
1628
1629 conn.send_bytes(SENTINEL)
1630 conn.close()
1631 p.join()
1632
1633 def test_sendbytes(self):
1634 if self.TYPE != 'processes':
1635 return
1636
1637 msg = latin('abcdefghijklmnopqrstuvwxyz')
1638 a, b = self.Pipe()
1639
1640 a.send_bytes(msg)
1641 self.assertEqual(b.recv_bytes(), msg)
1642
1643 a.send_bytes(msg, 5)
1644 self.assertEqual(b.recv_bytes(), msg[5:])
1645
1646 a.send_bytes(msg, 7, 8)
1647 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1648
1649 a.send_bytes(msg, 26)
1650 self.assertEqual(b.recv_bytes(), latin(''))
1651
1652 a.send_bytes(msg, 26, 0)
1653 self.assertEqual(b.recv_bytes(), latin(''))
1654
1655 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1656
1657 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1658
1659 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1660
1661 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1662
1663 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1664
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001665 @classmethod
1666 def _is_fd_assigned(cls, fd):
1667 try:
1668 os.fstat(fd)
1669 except OSError as e:
1670 if e.errno == errno.EBADF:
1671 return False
1672 raise
1673 else:
1674 return True
1675
1676 @classmethod
1677 def _writefd(cls, conn, data, create_dummy_fds=False):
1678 if create_dummy_fds:
1679 for i in range(0, 256):
1680 if not cls._is_fd_assigned(i):
1681 os.dup2(conn.fileno(), i)
1682 fd = reduction.recv_handle(conn)
1683 if msvcrt:
1684 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1685 os.write(fd, data)
1686 os.close(fd)
1687
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001688 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001689 def test_fd_transfer(self):
1690 if self.TYPE != 'processes':
1691 self.skipTest("only makes sense with processes")
1692 conn, child_conn = self.Pipe(duplex=True)
1693
1694 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001695 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001696 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001697 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001698 with open(test.support.TESTFN, "wb") as f:
1699 fd = f.fileno()
1700 if msvcrt:
1701 fd = msvcrt.get_osfhandle(fd)
1702 reduction.send_handle(conn, fd, p.pid)
1703 p.join()
1704 with open(test.support.TESTFN, "rb") as f:
1705 self.assertEqual(f.read(), b"foo")
1706
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001707 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001708 @unittest.skipIf(sys.platform == "win32",
1709 "test semantics don't make sense on Windows")
1710 @unittest.skipIf(MAXFD <= 256,
1711 "largest assignable fd number is too small")
1712 @unittest.skipUnless(hasattr(os, "dup2"),
1713 "test needs os.dup2()")
1714 def test_large_fd_transfer(self):
1715 # With fd > 256 (issue #11657)
1716 if self.TYPE != 'processes':
1717 self.skipTest("only makes sense with processes")
1718 conn, child_conn = self.Pipe(duplex=True)
1719
1720 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001721 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001722 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001723 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001724 with open(test.support.TESTFN, "wb") as f:
1725 fd = f.fileno()
1726 for newfd in range(256, MAXFD):
1727 if not self._is_fd_assigned(newfd):
1728 break
1729 else:
1730 self.fail("could not find an unassigned large file descriptor")
1731 os.dup2(fd, newfd)
1732 try:
1733 reduction.send_handle(conn, newfd, p.pid)
1734 finally:
1735 os.close(newfd)
1736 p.join()
1737 with open(test.support.TESTFN, "rb") as f:
1738 self.assertEqual(f.read(), b"bar")
1739
Jesus Cea4507e642011-09-21 03:53:25 +02001740 @classmethod
1741 def _send_data_without_fd(self, conn):
1742 os.write(conn.fileno(), b"\0")
1743
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001744 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001745 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1746 def test_missing_fd_transfer(self):
1747 # Check that exception is raised when received data is not
1748 # accompanied by a file descriptor in ancillary data.
1749 if self.TYPE != 'processes':
1750 self.skipTest("only makes sense with processes")
1751 conn, child_conn = self.Pipe(duplex=True)
1752
1753 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1754 p.daemon = True
1755 p.start()
1756 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1757 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001758
Benjamin Petersone711caf2008-06-11 16:44:04 +00001759class _TestListenerClient(BaseTestCase):
1760
1761 ALLOWED_TYPES = ('processes', 'threads')
1762
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001763 @classmethod
1764 def _test(cls, address):
1765 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001766 conn.send('hello')
1767 conn.close()
1768
1769 def test_listener_client(self):
1770 for family in self.connection.families:
1771 l = self.connection.Listener(family=family)
1772 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001773 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001774 p.start()
1775 conn = l.accept()
1776 self.assertEqual(conn.recv(), 'hello')
1777 p.join()
1778 l.close()
Richard Oudkerk7ef909c2012-05-05 20:41:23 +01001779
1780 def test_issue14725(self):
1781 l = self.connection.Listener()
1782 p = self.Process(target=self._test, args=(l.address,))
1783 p.daemon = True
1784 p.start()
1785 time.sleep(1)
1786 # On Windows the client process should by now have connected,
1787 # written data and closed the pipe handle by now. This causes
1788 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1789 # 14725.
1790 conn = l.accept()
1791 self.assertEqual(conn.recv(), 'hello')
1792 conn.close()
1793 p.join()
1794 l.close()
1795
Benjamin Petersone711caf2008-06-11 16:44:04 +00001796#
1797# Test of sending connection and socket objects between processes
1798#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001799"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001800class _TestPicklingConnections(BaseTestCase):
1801
1802 ALLOWED_TYPES = ('processes',)
1803
1804 def _listener(self, conn, families):
1805 for fam in families:
1806 l = self.connection.Listener(family=fam)
1807 conn.send(l.address)
1808 new_conn = l.accept()
1809 conn.send(new_conn)
1810
1811 if self.TYPE == 'processes':
1812 l = socket.socket()
1813 l.bind(('localhost', 0))
1814 conn.send(l.getsockname())
1815 l.listen(1)
1816 new_conn, addr = l.accept()
1817 conn.send(new_conn)
1818
1819 conn.recv()
1820
1821 def _remote(self, conn):
1822 for (address, msg) in iter(conn.recv, None):
1823 client = self.connection.Client(address)
1824 client.send(msg.upper())
1825 client.close()
1826
1827 if self.TYPE == 'processes':
1828 address, msg = conn.recv()
1829 client = socket.socket()
1830 client.connect(address)
1831 client.sendall(msg.upper())
1832 client.close()
1833
1834 conn.close()
1835
1836 def test_pickling(self):
1837 try:
1838 multiprocessing.allow_connection_pickling()
1839 except ImportError:
1840 return
1841
1842 families = self.connection.families
1843
1844 lconn, lconn0 = self.Pipe()
1845 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001846 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001847 lp.start()
1848 lconn0.close()
1849
1850 rconn, rconn0 = self.Pipe()
1851 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001852 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001853 rp.start()
1854 rconn0.close()
1855
1856 for fam in families:
1857 msg = ('This connection uses family %s' % fam).encode('ascii')
1858 address = lconn.recv()
1859 rconn.send((address, msg))
1860 new_conn = lconn.recv()
1861 self.assertEqual(new_conn.recv(), msg.upper())
1862
1863 rconn.send(None)
1864
1865 if self.TYPE == 'processes':
1866 msg = latin('This connection uses a normal socket')
1867 address = lconn.recv()
1868 rconn.send((address, msg))
1869 if hasattr(socket, 'fromfd'):
1870 new_conn = lconn.recv()
1871 self.assertEqual(new_conn.recv(100), msg.upper())
1872 else:
1873 # XXX On Windows with Py2.6 need to backport fromfd()
1874 discard = lconn.recv_bytes()
1875
1876 lconn.send(None)
1877
1878 rconn.close()
1879 lconn.close()
1880
1881 lp.join()
1882 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001883"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001884#
1885#
1886#
1887
1888class _TestHeap(BaseTestCase):
1889
1890 ALLOWED_TYPES = ('processes',)
1891
1892 def test_heap(self):
1893 iterations = 5000
1894 maxblocks = 50
1895 blocks = []
1896
1897 # create and destroy lots of blocks of different sizes
1898 for i in range(iterations):
1899 size = int(random.lognormvariate(0, 1) * 1000)
1900 b = multiprocessing.heap.BufferWrapper(size)
1901 blocks.append(b)
1902 if len(blocks) > maxblocks:
1903 i = random.randrange(maxblocks)
1904 del blocks[i]
1905
1906 # get the heap object
1907 heap = multiprocessing.heap.BufferWrapper._heap
1908
1909 # verify the state of the heap
1910 all = []
1911 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001912 heap._lock.acquire()
1913 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001914 for L in list(heap._len_to_seq.values()):
1915 for arena, start, stop in L:
1916 all.append((heap._arenas.index(arena), start, stop,
1917 stop-start, 'free'))
1918 for arena, start, stop in heap._allocated_blocks:
1919 all.append((heap._arenas.index(arena), start, stop,
1920 stop-start, 'occupied'))
1921 occupied += (stop-start)
1922
1923 all.sort()
1924
1925 for i in range(len(all)-1):
1926 (arena, start, stop) = all[i][:3]
1927 (narena, nstart, nstop) = all[i+1][:3]
1928 self.assertTrue((arena != narena and nstart == 0) or
1929 (stop == nstart))
1930
Charles-François Natali778db492011-07-02 14:35:49 +02001931 def test_free_from_gc(self):
1932 # Check that freeing of blocks by the garbage collector doesn't deadlock
1933 # (issue #12352).
1934 # Make sure the GC is enabled, and set lower collection thresholds to
1935 # make collections more frequent (and increase the probability of
1936 # deadlock).
1937 if not gc.isenabled():
1938 gc.enable()
1939 self.addCleanup(gc.disable)
1940 thresholds = gc.get_threshold()
1941 self.addCleanup(gc.set_threshold, *thresholds)
1942 gc.set_threshold(10)
1943
1944 # perform numerous block allocations, with cyclic references to make
1945 # sure objects are collected asynchronously by the gc
1946 for i in range(5000):
1947 a = multiprocessing.heap.BufferWrapper(1)
1948 b = multiprocessing.heap.BufferWrapper(1)
1949 # circular references
1950 a.buddy = b
1951 b.buddy = a
1952
Benjamin Petersone711caf2008-06-11 16:44:04 +00001953#
1954#
1955#
1956
Benjamin Petersone711caf2008-06-11 16:44:04 +00001957class _Foo(Structure):
1958 _fields_ = [
1959 ('x', c_int),
1960 ('y', c_double)
1961 ]
1962
1963class _TestSharedCTypes(BaseTestCase):
1964
1965 ALLOWED_TYPES = ('processes',)
1966
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001967 def setUp(self):
1968 if not HAS_SHAREDCTYPES:
1969 self.skipTest("requires multiprocessing.sharedctypes")
1970
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001971 @classmethod
1972 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001973 x.value *= 2
1974 y.value *= 2
1975 foo.x *= 2
1976 foo.y *= 2
1977 string.value *= 2
1978 for i in range(len(arr)):
1979 arr[i] *= 2
1980
1981 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001982 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001983 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001984 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001985 arr = self.Array('d', list(range(10)), lock=lock)
1986 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001987 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001988
1989 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001990 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991 p.start()
1992 p.join()
1993
1994 self.assertEqual(x.value, 14)
1995 self.assertAlmostEqual(y.value, 2.0/3.0)
1996 self.assertEqual(foo.x, 6)
1997 self.assertAlmostEqual(foo.y, 4.0)
1998 for i in range(10):
1999 self.assertAlmostEqual(arr[i], i*2)
2000 self.assertEqual(string.value, latin('hellohello'))
2001
2002 def test_synchronize(self):
2003 self.test_sharedctypes(lock=True)
2004
2005 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002006 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002007 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002008 foo.x = 0
2009 foo.y = 0
2010 self.assertEqual(bar.x, 2)
2011 self.assertAlmostEqual(bar.y, 5.0)
2012
2013#
2014#
2015#
2016
2017class _TestFinalize(BaseTestCase):
2018
2019 ALLOWED_TYPES = ('processes',)
2020
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002021 @classmethod
2022 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002023 class Foo(object):
2024 pass
2025
2026 a = Foo()
2027 util.Finalize(a, conn.send, args=('a',))
2028 del a # triggers callback for a
2029
2030 b = Foo()
2031 close_b = util.Finalize(b, conn.send, args=('b',))
2032 close_b() # triggers callback for b
2033 close_b() # does nothing because callback has already been called
2034 del b # does nothing because callback has already been called
2035
2036 c = Foo()
2037 util.Finalize(c, conn.send, args=('c',))
2038
2039 d10 = Foo()
2040 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2041
2042 d01 = Foo()
2043 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2044 d02 = Foo()
2045 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2046 d03 = Foo()
2047 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2048
2049 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2050
2051 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2052
Ezio Melotti13925002011-03-16 11:05:33 +02002053 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002054 # garbage collecting locals
2055 util._exit_function()
2056 conn.close()
2057 os._exit(0)
2058
2059 def test_finalize(self):
2060 conn, child_conn = self.Pipe()
2061
2062 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002063 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002064 p.start()
2065 p.join()
2066
2067 result = [obj for obj in iter(conn.recv, 'STOP')]
2068 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2069
2070#
2071# Test that from ... import * works for each module
2072#
2073
2074class _TestImportStar(BaseTestCase):
2075
2076 ALLOWED_TYPES = ('processes',)
2077
2078 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002079 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002080 'multiprocessing', 'multiprocessing.connection',
2081 'multiprocessing.heap', 'multiprocessing.managers',
2082 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002083 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002084 ]
2085
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002086 if HAS_REDUCTION:
2087 modules.append('multiprocessing.reduction')
2088
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002089 if c_int is not None:
2090 # This module requires _ctypes
2091 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002092
2093 for name in modules:
2094 __import__(name)
2095 mod = sys.modules[name]
2096
2097 for attr in getattr(mod, '__all__', ()):
2098 self.assertTrue(
2099 hasattr(mod, attr),
2100 '%r does not have attribute %r' % (mod, attr)
2101 )
2102
2103#
2104# Quick test that logging works -- does not test logging output
2105#
2106
2107class _TestLogging(BaseTestCase):
2108
2109 ALLOWED_TYPES = ('processes',)
2110
2111 def test_enable_logging(self):
2112 logger = multiprocessing.get_logger()
2113 logger.setLevel(util.SUBWARNING)
2114 self.assertTrue(logger is not None)
2115 logger.debug('this will not be printed')
2116 logger.info('nor will this')
2117 logger.setLevel(LOG_LEVEL)
2118
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002119 @classmethod
2120 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002121 logger = multiprocessing.get_logger()
2122 conn.send(logger.getEffectiveLevel())
2123
2124 def test_level(self):
2125 LEVEL1 = 32
2126 LEVEL2 = 37
2127
2128 logger = multiprocessing.get_logger()
2129 root_logger = logging.getLogger()
2130 root_level = root_logger.level
2131
2132 reader, writer = multiprocessing.Pipe(duplex=False)
2133
2134 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002135 p = self.Process(target=self._test_level, args=(writer,))
2136 p.daemon = True
2137 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002138 self.assertEqual(LEVEL1, reader.recv())
2139
2140 logger.setLevel(logging.NOTSET)
2141 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002142 p = self.Process(target=self._test_level, args=(writer,))
2143 p.daemon = True
2144 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002145 self.assertEqual(LEVEL2, reader.recv())
2146
2147 root_logger.setLevel(root_level)
2148 logger.setLevel(level=LOG_LEVEL)
2149
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002150
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002151# class _TestLoggingProcessName(BaseTestCase):
2152#
2153# def handle(self, record):
2154# assert record.processName == multiprocessing.current_process().name
2155# self.__handled = True
2156#
2157# def test_logging(self):
2158# handler = logging.Handler()
2159# handler.handle = self.handle
2160# self.__handled = False
2161# # Bypass getLogger() and side-effects
2162# logger = logging.getLoggerClass()(
2163# 'multiprocessing.test.TestLoggingProcessName')
2164# logger.addHandler(handler)
2165# logger.propagate = False
2166#
2167# logger.warn('foo')
2168# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002169
Benjamin Petersone711caf2008-06-11 16:44:04 +00002170#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002171# Check that Process.join() retries if os.waitpid() fails with EINTR
2172#
2173
2174class _TestPollEintr(BaseTestCase):
2175
2176 ALLOWED_TYPES = ('processes',)
2177
2178 @classmethod
2179 def _killer(cls, pid):
2180 time.sleep(0.5)
2181 os.kill(pid, signal.SIGUSR1)
2182
2183 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2184 def test_poll_eintr(self):
2185 got_signal = [False]
2186 def record(*args):
2187 got_signal[0] = True
2188 pid = os.getpid()
2189 oldhandler = signal.signal(signal.SIGUSR1, record)
2190 try:
2191 killer = self.Process(target=self._killer, args=(pid,))
2192 killer.start()
2193 p = self.Process(target=time.sleep, args=(1,))
2194 p.start()
2195 p.join()
2196 self.assertTrue(got_signal[0])
2197 self.assertEqual(p.exitcode, 0)
2198 killer.join()
2199 finally:
2200 signal.signal(signal.SIGUSR1, oldhandler)
2201
2202#
Jesse Noller6214edd2009-01-19 16:23:53 +00002203# Test to verify handle verification, see issue 3321
2204#
2205
2206class TestInvalidHandle(unittest.TestCase):
2207
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002208 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002209 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002210 conn = _multiprocessing.Connection(44977608)
2211 self.assertRaises(IOError, conn.poll)
2212 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002213
Jesse Noller6214edd2009-01-19 16:23:53 +00002214#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002215# Functions used to create test cases from the base ones in this module
2216#
2217
2218def get_attributes(Source, names):
2219 d = {}
2220 for name in names:
2221 obj = getattr(Source, name)
2222 if type(obj) == type(get_attributes):
2223 obj = staticmethod(obj)
2224 d[name] = obj
2225 return d
2226
2227def create_test_cases(Mixin, type):
2228 result = {}
2229 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002230 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002231
2232 for name in list(glob.keys()):
2233 if name.startswith('_Test'):
2234 base = glob[name]
2235 if type in base.ALLOWED_TYPES:
2236 newname = 'With' + Type + name[1:]
2237 class Temp(base, unittest.TestCase, Mixin):
2238 pass
2239 result[newname] = Temp
2240 Temp.__name__ = newname
2241 Temp.__module__ = Mixin.__module__
2242 return result
2243
2244#
2245# Create test cases
2246#
2247
2248class ProcessesMixin(object):
2249 TYPE = 'processes'
2250 Process = multiprocessing.Process
2251 locals().update(get_attributes(multiprocessing, (
2252 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2253 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2254 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002255 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002256 )))
2257
2258testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2259globals().update(testcases_processes)
2260
2261
2262class ManagerMixin(object):
2263 TYPE = 'manager'
2264 Process = multiprocessing.Process
2265 manager = object.__new__(multiprocessing.managers.SyncManager)
2266 locals().update(get_attributes(manager, (
2267 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2268 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002269 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002270 )))
2271
2272testcases_manager = create_test_cases(ManagerMixin, type='manager')
2273globals().update(testcases_manager)
2274
2275
2276class ThreadsMixin(object):
2277 TYPE = 'threads'
2278 Process = multiprocessing.dummy.Process
2279 locals().update(get_attributes(multiprocessing.dummy, (
2280 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2281 'Condition', 'Event', 'Value', 'Array', 'current_process',
2282 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002283 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002284 )))
2285
2286testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2287globals().update(testcases_threads)
2288
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002289class OtherTest(unittest.TestCase):
2290 # TODO: add more tests for deliver/answer challenge.
2291 def test_deliver_challenge_auth_failure(self):
2292 class _FakeConnection(object):
2293 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002294 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002295 def send_bytes(self, data):
2296 pass
2297 self.assertRaises(multiprocessing.AuthenticationError,
2298 multiprocessing.connection.deliver_challenge,
2299 _FakeConnection(), b'abc')
2300
2301 def test_answer_challenge_auth_failure(self):
2302 class _FakeConnection(object):
2303 def __init__(self):
2304 self.count = 0
2305 def recv_bytes(self, size):
2306 self.count += 1
2307 if self.count == 1:
2308 return multiprocessing.connection.CHALLENGE
2309 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002310 return b'something bogus'
2311 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002312 def send_bytes(self, data):
2313 pass
2314 self.assertRaises(multiprocessing.AuthenticationError,
2315 multiprocessing.connection.answer_challenge,
2316 _FakeConnection(), b'abc')
2317
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002318#
2319# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2320#
2321
2322def initializer(ns):
2323 ns.test += 1
2324
2325class TestInitializers(unittest.TestCase):
2326 def setUp(self):
2327 self.mgr = multiprocessing.Manager()
2328 self.ns = self.mgr.Namespace()
2329 self.ns.test = 0
2330
2331 def tearDown(self):
2332 self.mgr.shutdown()
2333
2334 def test_manager_initializer(self):
2335 m = multiprocessing.managers.SyncManager()
2336 self.assertRaises(TypeError, m.start, 1)
2337 m.start(initializer, (self.ns,))
2338 self.assertEqual(self.ns.test, 1)
2339 m.shutdown()
2340
2341 def test_pool_initializer(self):
2342 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2343 p = multiprocessing.Pool(1, initializer, (self.ns,))
2344 p.close()
2345 p.join()
2346 self.assertEqual(self.ns.test, 1)
2347
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002348#
2349# Issue 5155, 5313, 5331: Test process in processes
2350# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2351#
2352
2353def _ThisSubProcess(q):
2354 try:
2355 item = q.get(block=False)
2356 except pyqueue.Empty:
2357 pass
2358
2359def _TestProcess(q):
2360 queue = multiprocessing.Queue()
2361 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002362 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002363 subProc.start()
2364 subProc.join()
2365
2366def _afunc(x):
2367 return x*x
2368
2369def pool_in_process():
2370 pool = multiprocessing.Pool(processes=4)
2371 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2372
2373class _file_like(object):
2374 def __init__(self, delegate):
2375 self._delegate = delegate
2376 self._pid = None
2377
2378 @property
2379 def cache(self):
2380 pid = os.getpid()
2381 # There are no race conditions since fork keeps only the running thread
2382 if pid != self._pid:
2383 self._pid = pid
2384 self._cache = []
2385 return self._cache
2386
2387 def write(self, data):
2388 self.cache.append(data)
2389
2390 def flush(self):
2391 self._delegate.write(''.join(self.cache))
2392 self._cache = []
2393
2394class TestStdinBadfiledescriptor(unittest.TestCase):
2395
2396 def test_queue_in_process(self):
2397 queue = multiprocessing.Queue()
2398 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2399 proc.start()
2400 proc.join()
2401
2402 def test_pool_in_process(self):
2403 p = multiprocessing.Process(target=pool_in_process)
2404 p.start()
2405 p.join()
2406
2407 def test_flushing(self):
2408 sio = io.StringIO()
2409 flike = _file_like(sio)
2410 flike.write('foo')
2411 proc = multiprocessing.Process(target=lambda: flike.flush())
2412 flike.flush()
2413 assert sio.getvalue() == 'foo'
2414
Antoine Pitrou709176f2012-04-01 17:19:09 +02002415
2416#
2417# Issue 14151: Test invalid family on invalid environment
2418#
2419
2420class TestInvalidFamily(unittest.TestCase):
2421
2422 @unittest.skipIf(WIN32, "skipped on Windows")
2423 def test_invalid_family(self):
2424 with self.assertRaises(ValueError):
2425 multiprocessing.connection.Listener(r'\\.\test')
2426
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002427 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2428 def test_invalid_family_win32(self):
2429 with self.assertRaises(ValueError):
2430 multiprocessing.connection.Listener('/var/test.pipe')
2431
Richard Oudkerk4887b1c2012-07-27 14:06:11 +01002432#
2433# Test interaction with socket timeouts - see Issue #6056
2434#
2435
2436class TestTimeouts(unittest.TestCase):
2437 @classmethod
2438 def _test_timeout(cls, child, address):
2439 time.sleep(1)
2440 child.send(123)
2441 child.close()
2442 conn = multiprocessing.connection.Client(address)
2443 conn.send(456)
2444 conn.close()
2445
2446 def test_timeout(self):
2447 old_timeout = socket.getdefaulttimeout()
2448 try:
2449 socket.setdefaulttimeout(0.1)
2450 parent, child = multiprocessing.Pipe(duplex=True)
2451 l = multiprocessing.connection.Listener(family='AF_INET')
2452 p = multiprocessing.Process(target=self._test_timeout,
2453 args=(child, l.address))
2454 p.start()
2455 child.close()
2456 self.assertEqual(parent.recv(), 123)
2457 parent.close()
2458 conn = l.accept()
2459 self.assertEqual(conn.recv(), 456)
2460 conn.close()
2461 l.close()
2462 p.join(10)
2463 finally:
2464 socket.setdefaulttimeout(old_timeout)
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002465
Richard Oudkerke88a2442012-08-14 11:41:32 +01002466#
2467# Test what happens with no "if __name__ == '__main__'"
2468#
2469
2470class TestNoForkBomb(unittest.TestCase):
2471 def test_noforkbomb(self):
2472 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2473 if WIN32:
2474 rc, out, err = test.script_helper.assert_python_failure(name)
2475 self.assertEqual('', out.decode('ascii'))
2476 self.assertIn('RuntimeError', err.decode('ascii'))
2477 else:
2478 rc, out, err = test.script_helper.assert_python_ok(name)
2479 self.assertEqual('123', out.decode('ascii').rstrip())
2480 self.assertEqual('', err.decode('ascii'))
2481
2482#
2483#
2484#
2485
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002486testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk4887b1c2012-07-27 14:06:11 +01002487 TestStdinBadfiledescriptor, TestInvalidFamily,
Richard Oudkerke88a2442012-08-14 11:41:32 +01002488 TestTimeouts, TestNoForkBomb]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002489
Benjamin Petersone711caf2008-06-11 16:44:04 +00002490#
2491#
2492#
2493
2494def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002495 if sys.platform.startswith("linux"):
2496 try:
2497 lock = multiprocessing.RLock()
2498 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002499 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002500
Charles-François Natali3be00952011-11-22 18:36:39 +01002501 check_enough_semaphores()
2502
Benjamin Petersone711caf2008-06-11 16:44:04 +00002503 if run is None:
2504 from test.support import run_unittest as run
2505
2506 util.get_temp_dir() # creates temp directory for use by all processes
2507
2508 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2509
Benjamin Peterson41181742008-07-02 20:22:54 +00002510 ProcessesMixin.pool = multiprocessing.Pool(4)
2511 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2512 ManagerMixin.manager.__init__()
2513 ManagerMixin.manager.start()
2514 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002515
2516 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002517 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2518 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002519 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2520 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002521 )
2522
2523 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2524 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2525 run(suite)
2526
Benjamin Peterson41181742008-07-02 20:22:54 +00002527 ThreadsMixin.pool.terminate()
2528 ProcessesMixin.pool.terminate()
2529 ManagerMixin.pool.terminate()
2530 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002531
Benjamin Peterson41181742008-07-02 20:22:54 +00002532 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002533
2534def main():
2535 test_main(unittest.TextTestRunner(verbosity=2).run)
2536
2537if __name__ == '__main__':
2538 main()