blob: b812e4840b830d0d36b13f904b4b3199f687ce40 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalie51c8da2011-09-21 18:48:21 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
85
Antoine Pitroubcb39d42011-08-23 19:46:22 +020086try:
87 MAXFD = os.sysconf("SC_OPEN_MAX")
88except:
89 MAXFD = 256
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000092# Some tests require ctypes
93#
94
95try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000096 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000097except ImportError:
98 Structure = object
99 c_int = c_double = None
100
Charles-François Natali3be00952011-11-22 18:36:39 +0100101
102def check_enough_semaphores():
103 """Check that the system supports enough semaphores to run the test."""
104 # minimum number of semaphores available according to POSIX
105 nsems_min = 256
106 try:
107 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
108 except (AttributeError, ValueError):
109 # sysconf not available or setting not available
110 return
111 if nsems == -1 or nsems >= nsems_min:
112 return
113 raise unittest.SkipTest("The OS doesn't support enough semaphores "
114 "to run the test (required: %d)." % nsems_min)
115
116
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000117#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000118# Creates a wrapper for a function which records the time it takes to finish
119#
120
121class TimingWrapper(object):
122
123 def __init__(self, func):
124 self.func = func
125 self.elapsed = None
126
127 def __call__(self, *args, **kwds):
128 t = time.time()
129 try:
130 return self.func(*args, **kwds)
131 finally:
132 self.elapsed = time.time() - t
133
134#
135# Base class for test cases
136#
137
138class BaseTestCase(object):
139
140 ALLOWED_TYPES = ('processes', 'manager', 'threads')
141
142 def assertTimingAlmostEqual(self, a, b):
143 if CHECK_TIMINGS:
144 self.assertAlmostEqual(a, b, 1)
145
146 def assertReturnsIfImplemented(self, value, func, *args):
147 try:
148 res = func(*args)
149 except NotImplementedError:
150 pass
151 else:
152 return self.assertEqual(value, res)
153
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000154 # For the sanity of Windows users, rather than crashing or freezing in
155 # multiple ways.
156 def __reduce__(self, *args):
157 raise NotImplementedError("shouldn't try to pickle a test case")
158
159 __reduce_ex__ = __reduce__
160
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161#
162# Return the value of a semaphore
163#
164
165def get_value(self):
166 try:
167 return self.get_value()
168 except AttributeError:
169 try:
170 return self._Semaphore__value
171 except AttributeError:
172 try:
173 return self._value
174 except AttributeError:
175 raise NotImplementedError
176
177#
178# Testcases
179#
180
181class _TestProcess(BaseTestCase):
182
183 ALLOWED_TYPES = ('processes', 'threads')
184
185 def test_current(self):
186 if self.TYPE == 'threads':
187 return
188
189 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000193 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000194 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 self.assertEqual(current.ident, os.getpid())
197 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000199 @classmethod
200 def _test(cls, q, *args, **kwds):
201 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202 q.put(args)
203 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000205 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 q.put(current.pid)
208
209 def test_process(self):
210 q = self.Queue(1)
211 e = self.Event()
212 args = (q, 1, 2)
213 kwargs = {'hello':23, 'bye':2.54}
214 name = 'SomeProcess'
215 p = self.Process(
216 target=self._test, args=args, kwargs=kwargs, name=name
217 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000218 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219 current = self.current_process()
220
221 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000222 self.assertEqual(p.authkey, current.authkey)
223 self.assertEqual(p.is_alive(), False)
224 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000225 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 p.start()
230
Ezio Melottib3aedd42010-11-20 19:04:17 +0000231 self.assertEqual(p.exitcode, None)
232 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000233 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
Ezio Melottib3aedd42010-11-20 19:04:17 +0000235 self.assertEqual(q.get(), args[1:])
236 self.assertEqual(q.get(), kwargs)
237 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000239 self.assertEqual(q.get(), current.authkey)
240 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.join()
243
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.exitcode, 0)
245 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000248 @classmethod
249 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250 time.sleep(1000)
251
252 def test_terminate(self):
253 if self.TYPE == 'threads':
254 return
255
256 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 p.start()
259
260 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000262 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
264 p.terminate()
265
266 join = TimingWrapper(p.join)
267 self.assertEqual(join(), None)
268 self.assertTimingAlmostEqual(join.elapsed, 0.0)
269
270 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000271 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272
273 p.join()
274
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 # XXX sometimes get p.exitcode == 0 on Windows ...
276 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
278 def test_cpu_count(self):
279 try:
280 cpus = multiprocessing.cpu_count()
281 except NotImplementedError:
282 cpus = 1
283 self.assertTrue(type(cpus) is int)
284 self.assertTrue(cpus >= 1)
285
286 def test_active_children(self):
287 self.assertEqual(type(self.active_children()), list)
288
289 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291
Jesus Cea94f964f2011-09-09 20:26:57 +0200292 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000294 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295
296 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000297 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000299 @classmethod
300 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 from multiprocessing import forking
302 wconn.send(id)
303 if len(id) < 2:
304 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000305 p = cls.Process(
306 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307 )
308 p.start()
309 p.join()
310
311 def test_recursion(self):
312 rconn, wconn = self.Pipe(duplex=False)
313 self._test_recursion(wconn, [])
314
315 time.sleep(DELTA)
316 result = []
317 while rconn.poll():
318 result.append(rconn.recv())
319
320 expected = [
321 [],
322 [0],
323 [0, 0],
324 [0, 1],
325 [1],
326 [1, 0],
327 [1, 1]
328 ]
329 self.assertEqual(result, expected)
330
331#
332#
333#
334
335class _UpperCaser(multiprocessing.Process):
336
337 def __init__(self):
338 multiprocessing.Process.__init__(self)
339 self.child_conn, self.parent_conn = multiprocessing.Pipe()
340
341 def run(self):
342 self.parent_conn.close()
343 for s in iter(self.child_conn.recv, None):
344 self.child_conn.send(s.upper())
345 self.child_conn.close()
346
347 def submit(self, s):
348 assert type(s) is str
349 self.parent_conn.send(s)
350 return self.parent_conn.recv()
351
352 def stop(self):
353 self.parent_conn.send(None)
354 self.parent_conn.close()
355 self.child_conn.close()
356
357class _TestSubclassingProcess(BaseTestCase):
358
359 ALLOWED_TYPES = ('processes',)
360
361 def test_subclassing(self):
362 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200363 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364 uppercaser.start()
365 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
366 self.assertEqual(uppercaser.submit('world'), 'WORLD')
367 uppercaser.stop()
368 uppercaser.join()
369
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100370 def test_stderr_flush(self):
371 # sys.stderr is flushed at process shutdown (issue #13812)
372 if self.TYPE == "threads":
373 return
374
375 testfn = test.support.TESTFN
376 self.addCleanup(test.support.unlink, testfn)
377 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
378 proc.start()
379 proc.join()
380 with open(testfn, 'r') as f:
381 err = f.read()
382 # The whole traceback was printed
383 self.assertIn("ZeroDivisionError", err)
384 self.assertIn("test_multiprocessing.py", err)
385 self.assertIn("1/0 # MARKER", err)
386
387 @classmethod
388 def _test_stderr_flush(cls, testfn):
389 sys.stderr = open(testfn, 'w')
390 1/0 # MARKER
391
392
Richard Oudkerk29471de2012-06-06 19:04:57 +0100393 @classmethod
394 def _test_sys_exit(cls, reason, testfn):
395 sys.stderr = open(testfn, 'w')
396 sys.exit(reason)
397
398 def test_sys_exit(self):
399 # See Issue 13854
400 if self.TYPE == 'threads':
401 return
402
403 testfn = test.support.TESTFN
404 self.addCleanup(test.support.unlink, testfn)
405
406 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
407 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
408 p.daemon = True
409 p.start()
410 p.join(5)
411 self.assertEqual(p.exitcode, code)
412
413 with open(testfn, 'r') as f:
414 self.assertEqual(f.read().rstrip(), str(reason))
415
416 for reason in (True, False, 8):
417 p = self.Process(target=sys.exit, args=(reason,))
418 p.daemon = True
419 p.start()
420 p.join(5)
421 self.assertEqual(p.exitcode, reason)
422
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423#
424#
425#
426
427def queue_empty(q):
428 if hasattr(q, 'empty'):
429 return q.empty()
430 else:
431 return q.qsize() == 0
432
433def queue_full(q, maxsize):
434 if hasattr(q, 'full'):
435 return q.full()
436 else:
437 return q.qsize() == maxsize
438
439
440class _TestQueue(BaseTestCase):
441
442
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000443 @classmethod
444 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000445 child_can_start.wait()
446 for i in range(6):
447 queue.get()
448 parent_can_continue.set()
449
450 def test_put(self):
451 MAXSIZE = 6
452 queue = self.Queue(maxsize=MAXSIZE)
453 child_can_start = self.Event()
454 parent_can_continue = self.Event()
455
456 proc = self.Process(
457 target=self._test_put,
458 args=(queue, child_can_start, parent_can_continue)
459 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000460 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000461 proc.start()
462
463 self.assertEqual(queue_empty(queue), True)
464 self.assertEqual(queue_full(queue, MAXSIZE), False)
465
466 queue.put(1)
467 queue.put(2, True)
468 queue.put(3, True, None)
469 queue.put(4, False)
470 queue.put(5, False, None)
471 queue.put_nowait(6)
472
473 # the values may be in buffer but not yet in pipe so sleep a bit
474 time.sleep(DELTA)
475
476 self.assertEqual(queue_empty(queue), False)
477 self.assertEqual(queue_full(queue, MAXSIZE), True)
478
479 put = TimingWrapper(queue.put)
480 put_nowait = TimingWrapper(queue.put_nowait)
481
482 self.assertRaises(pyqueue.Full, put, 7, False)
483 self.assertTimingAlmostEqual(put.elapsed, 0)
484
485 self.assertRaises(pyqueue.Full, put, 7, False, None)
486 self.assertTimingAlmostEqual(put.elapsed, 0)
487
488 self.assertRaises(pyqueue.Full, put_nowait, 7)
489 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
490
491 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
492 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
493
494 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
495 self.assertTimingAlmostEqual(put.elapsed, 0)
496
497 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
498 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
499
500 child_can_start.set()
501 parent_can_continue.wait()
502
503 self.assertEqual(queue_empty(queue), True)
504 self.assertEqual(queue_full(queue, MAXSIZE), False)
505
506 proc.join()
507
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000508 @classmethod
509 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000511 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000512 queue.put(2)
513 queue.put(3)
514 queue.put(4)
515 queue.put(5)
516 parent_can_continue.set()
517
518 def test_get(self):
519 queue = self.Queue()
520 child_can_start = self.Event()
521 parent_can_continue = self.Event()
522
523 proc = self.Process(
524 target=self._test_get,
525 args=(queue, child_can_start, parent_can_continue)
526 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000527 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000528 proc.start()
529
530 self.assertEqual(queue_empty(queue), True)
531
532 child_can_start.set()
533 parent_can_continue.wait()
534
535 time.sleep(DELTA)
536 self.assertEqual(queue_empty(queue), False)
537
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000538 # Hangs unexpectedly, remove for now
539 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000540 self.assertEqual(queue.get(True, None), 2)
541 self.assertEqual(queue.get(True), 3)
542 self.assertEqual(queue.get(timeout=1), 4)
543 self.assertEqual(queue.get_nowait(), 5)
544
545 self.assertEqual(queue_empty(queue), True)
546
547 get = TimingWrapper(queue.get)
548 get_nowait = TimingWrapper(queue.get_nowait)
549
550 self.assertRaises(pyqueue.Empty, get, False)
551 self.assertTimingAlmostEqual(get.elapsed, 0)
552
553 self.assertRaises(pyqueue.Empty, get, False, None)
554 self.assertTimingAlmostEqual(get.elapsed, 0)
555
556 self.assertRaises(pyqueue.Empty, get_nowait)
557 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
558
559 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
560 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
561
562 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
563 self.assertTimingAlmostEqual(get.elapsed, 0)
564
565 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
566 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
567
568 proc.join()
569
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000570 @classmethod
571 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000572 for i in range(10, 20):
573 queue.put(i)
574 # note that at this point the items may only be buffered, so the
575 # process cannot shutdown until the feeder thread has finished
576 # pushing items onto the pipe.
577
578 def test_fork(self):
579 # Old versions of Queue would fail to create a new feeder
580 # thread for a forked process if the original process had its
581 # own feeder thread. This test checks that this no longer
582 # happens.
583
584 queue = self.Queue()
585
586 # put items on queue so that main process starts a feeder thread
587 for i in range(10):
588 queue.put(i)
589
590 # wait to make sure thread starts before we fork a new process
591 time.sleep(DELTA)
592
593 # fork process
594 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200595 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000596 p.start()
597
598 # check that all expected items are in the queue
599 for i in range(20):
600 self.assertEqual(queue.get(), i)
601 self.assertRaises(pyqueue.Empty, queue.get, False)
602
603 p.join()
604
605 def test_qsize(self):
606 q = self.Queue()
607 try:
608 self.assertEqual(q.qsize(), 0)
609 except NotImplementedError:
610 return
611 q.put(1)
612 self.assertEqual(q.qsize(), 1)
613 q.put(5)
614 self.assertEqual(q.qsize(), 2)
615 q.get()
616 self.assertEqual(q.qsize(), 1)
617 q.get()
618 self.assertEqual(q.qsize(), 0)
619
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000620 @classmethod
621 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000622 for obj in iter(q.get, None):
623 time.sleep(DELTA)
624 q.task_done()
625
626 def test_task_done(self):
627 queue = self.JoinableQueue()
628
629 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000630 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000631
632 workers = [self.Process(target=self._test_task_done, args=(queue,))
633 for i in range(4)]
634
635 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200636 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637 p.start()
638
639 for i in range(10):
640 queue.put(i)
641
642 queue.join()
643
644 for p in workers:
645 queue.put(None)
646
647 for p in workers:
648 p.join()
649
650#
651#
652#
653
654class _TestLock(BaseTestCase):
655
656 def test_lock(self):
657 lock = self.Lock()
658 self.assertEqual(lock.acquire(), True)
659 self.assertEqual(lock.acquire(False), False)
660 self.assertEqual(lock.release(), None)
661 self.assertRaises((ValueError, threading.ThreadError), lock.release)
662
663 def test_rlock(self):
664 lock = self.RLock()
665 self.assertEqual(lock.acquire(), True)
666 self.assertEqual(lock.acquire(), True)
667 self.assertEqual(lock.acquire(), True)
668 self.assertEqual(lock.release(), None)
669 self.assertEqual(lock.release(), None)
670 self.assertEqual(lock.release(), None)
671 self.assertRaises((AssertionError, RuntimeError), lock.release)
672
Jesse Nollerf8d00852009-03-31 03:25:07 +0000673 def test_lock_context(self):
674 with self.Lock():
675 pass
676
Benjamin Petersone711caf2008-06-11 16:44:04 +0000677
678class _TestSemaphore(BaseTestCase):
679
680 def _test_semaphore(self, sem):
681 self.assertReturnsIfImplemented(2, get_value, sem)
682 self.assertEqual(sem.acquire(), True)
683 self.assertReturnsIfImplemented(1, get_value, sem)
684 self.assertEqual(sem.acquire(), True)
685 self.assertReturnsIfImplemented(0, get_value, sem)
686 self.assertEqual(sem.acquire(False), False)
687 self.assertReturnsIfImplemented(0, get_value, sem)
688 self.assertEqual(sem.release(), None)
689 self.assertReturnsIfImplemented(1, get_value, sem)
690 self.assertEqual(sem.release(), None)
691 self.assertReturnsIfImplemented(2, get_value, sem)
692
693 def test_semaphore(self):
694 sem = self.Semaphore(2)
695 self._test_semaphore(sem)
696 self.assertEqual(sem.release(), None)
697 self.assertReturnsIfImplemented(3, get_value, sem)
698 self.assertEqual(sem.release(), None)
699 self.assertReturnsIfImplemented(4, get_value, sem)
700
701 def test_bounded_semaphore(self):
702 sem = self.BoundedSemaphore(2)
703 self._test_semaphore(sem)
704 # Currently fails on OS/X
705 #if HAVE_GETVALUE:
706 # self.assertRaises(ValueError, sem.release)
707 # self.assertReturnsIfImplemented(2, get_value, sem)
708
709 def test_timeout(self):
710 if self.TYPE != 'processes':
711 return
712
713 sem = self.Semaphore(0)
714 acquire = TimingWrapper(sem.acquire)
715
716 self.assertEqual(acquire(False), False)
717 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
718
719 self.assertEqual(acquire(False, None), False)
720 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
721
722 self.assertEqual(acquire(False, TIMEOUT1), False)
723 self.assertTimingAlmostEqual(acquire.elapsed, 0)
724
725 self.assertEqual(acquire(True, TIMEOUT2), False)
726 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
727
728 self.assertEqual(acquire(timeout=TIMEOUT3), False)
729 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
730
731
732class _TestCondition(BaseTestCase):
733
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000734 @classmethod
735 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736 cond.acquire()
737 sleeping.release()
738 cond.wait(timeout)
739 woken.release()
740 cond.release()
741
742 def check_invariant(self, cond):
743 # this is only supposed to succeed when there are no sleepers
744 if self.TYPE == 'processes':
745 try:
746 sleepers = (cond._sleeping_count.get_value() -
747 cond._woken_count.get_value())
748 self.assertEqual(sleepers, 0)
749 self.assertEqual(cond._wait_semaphore.get_value(), 0)
750 except NotImplementedError:
751 pass
752
753 def test_notify(self):
754 cond = self.Condition()
755 sleeping = self.Semaphore(0)
756 woken = self.Semaphore(0)
757
758 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000759 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000760 p.start()
761
762 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000763 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000764 p.start()
765
766 # wait for both children to start sleeping
767 sleeping.acquire()
768 sleeping.acquire()
769
770 # check no process/thread has woken up
771 time.sleep(DELTA)
772 self.assertReturnsIfImplemented(0, get_value, woken)
773
774 # wake up one process/thread
775 cond.acquire()
776 cond.notify()
777 cond.release()
778
779 # check one process/thread has woken up
780 time.sleep(DELTA)
781 self.assertReturnsIfImplemented(1, get_value, woken)
782
783 # wake up another
784 cond.acquire()
785 cond.notify()
786 cond.release()
787
788 # check other has woken up
789 time.sleep(DELTA)
790 self.assertReturnsIfImplemented(2, get_value, woken)
791
792 # check state is not mucked up
793 self.check_invariant(cond)
794 p.join()
795
796 def test_notify_all(self):
797 cond = self.Condition()
798 sleeping = self.Semaphore(0)
799 woken = self.Semaphore(0)
800
801 # start some threads/processes which will timeout
802 for i in range(3):
803 p = self.Process(target=self.f,
804 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000805 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000806 p.start()
807
808 t = threading.Thread(target=self.f,
809 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000810 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 t.start()
812
813 # wait for them all to sleep
814 for i in range(6):
815 sleeping.acquire()
816
817 # check they have all timed out
818 for i in range(6):
819 woken.acquire()
820 self.assertReturnsIfImplemented(0, get_value, woken)
821
822 # check state is not mucked up
823 self.check_invariant(cond)
824
825 # start some more threads/processes
826 for i in range(3):
827 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000828 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829 p.start()
830
831 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000832 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000833 t.start()
834
835 # wait for them to all sleep
836 for i in range(6):
837 sleeping.acquire()
838
839 # check no process/thread has woken up
840 time.sleep(DELTA)
841 self.assertReturnsIfImplemented(0, get_value, woken)
842
843 # wake them all up
844 cond.acquire()
845 cond.notify_all()
846 cond.release()
847
848 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200849 for i in range(10):
850 try:
851 if get_value(woken) == 6:
852 break
853 except NotImplementedError:
854 break
855 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000856 self.assertReturnsIfImplemented(6, get_value, woken)
857
858 # check state is not mucked up
859 self.check_invariant(cond)
860
861 def test_timeout(self):
862 cond = self.Condition()
863 wait = TimingWrapper(cond.wait)
864 cond.acquire()
865 res = wait(TIMEOUT1)
866 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000867 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
869
870
871class _TestEvent(BaseTestCase):
872
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000873 @classmethod
874 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000875 time.sleep(TIMEOUT2)
876 event.set()
877
878 def test_event(self):
879 event = self.Event()
880 wait = TimingWrapper(event.wait)
881
Ezio Melotti13925002011-03-16 11:05:33 +0200882 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000884 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885
Benjamin Peterson965ce872009-04-05 21:24:58 +0000886 # Removed, threading.Event.wait() will return the value of the __flag
887 # instead of None. API Shear with the semaphore backed mp.Event
888 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000889 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000890 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
892
893 event.set()
894
895 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000896 self.assertEqual(event.is_set(), True)
897 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000899 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
901 # self.assertEqual(event.is_set(), True)
902
903 event.clear()
904
905 #self.assertEqual(event.is_set(), False)
906
Jesus Cea94f964f2011-09-09 20:26:57 +0200907 p = self.Process(target=self._test_event, args=(event,))
908 p.daemon = True
909 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000910 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000911
912#
913#
914#
915
916class _TestValue(BaseTestCase):
917
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000918 ALLOWED_TYPES = ('processes',)
919
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 codes_values = [
921 ('i', 4343, 24234),
922 ('d', 3.625, -4.25),
923 ('h', -232, 234),
924 ('c', latin('x'), latin('y'))
925 ]
926
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000927 def setUp(self):
928 if not HAS_SHAREDCTYPES:
929 self.skipTest("requires multiprocessing.sharedctypes")
930
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000931 @classmethod
932 def _test(cls, values):
933 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000934 sv.value = cv[2]
935
936
937 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 if raw:
939 values = [self.RawValue(code, value)
940 for code, value, _ in self.codes_values]
941 else:
942 values = [self.Value(code, value)
943 for code, value, _ in self.codes_values]
944
945 for sv, cv in zip(values, self.codes_values):
946 self.assertEqual(sv.value, cv[1])
947
948 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200949 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000950 proc.start()
951 proc.join()
952
953 for sv, cv in zip(values, self.codes_values):
954 self.assertEqual(sv.value, cv[2])
955
956 def test_rawvalue(self):
957 self.test_value(raw=True)
958
959 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960 val1 = self.Value('i', 5)
961 lock1 = val1.get_lock()
962 obj1 = val1.get_obj()
963
964 val2 = self.Value('i', 5, lock=None)
965 lock2 = val2.get_lock()
966 obj2 = val2.get_obj()
967
968 lock = self.Lock()
969 val3 = self.Value('i', 5, lock=lock)
970 lock3 = val3.get_lock()
971 obj3 = val3.get_obj()
972 self.assertEqual(lock, lock3)
973
Jesse Nollerb0516a62009-01-18 03:11:38 +0000974 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975 self.assertFalse(hasattr(arr4, 'get_lock'))
976 self.assertFalse(hasattr(arr4, 'get_obj'))
977
Jesse Nollerb0516a62009-01-18 03:11:38 +0000978 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
979
980 arr5 = self.RawValue('i', 5)
981 self.assertFalse(hasattr(arr5, 'get_lock'))
982 self.assertFalse(hasattr(arr5, 'get_obj'))
983
Benjamin Petersone711caf2008-06-11 16:44:04 +0000984
985class _TestArray(BaseTestCase):
986
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000987 ALLOWED_TYPES = ('processes',)
988
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000989 @classmethod
990 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 for i in range(1, len(seq)):
992 seq[i] += seq[i-1]
993
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000994 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
997 if raw:
998 arr = self.RawArray('i', seq)
999 else:
1000 arr = self.Array('i', seq)
1001
1002 self.assertEqual(len(arr), len(seq))
1003 self.assertEqual(arr[3], seq[3])
1004 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1005
1006 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1007
1008 self.assertEqual(list(arr[:]), seq)
1009
1010 self.f(seq)
1011
1012 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001013 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001014 p.start()
1015 p.join()
1016
1017 self.assertEqual(list(arr[:]), seq)
1018
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001019 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001020 def test_array_from_size(self):
1021 size = 10
1022 # Test for zeroing (see issue #11675).
1023 # The repetition below strengthens the test by increasing the chances
1024 # of previously allocated non-zero memory being used for the new array
1025 # on the 2nd and 3rd loops.
1026 for _ in range(3):
1027 arr = self.Array('i', size)
1028 self.assertEqual(len(arr), size)
1029 self.assertEqual(list(arr), [0] * size)
1030 arr[:] = range(10)
1031 self.assertEqual(list(arr), list(range(10)))
1032 del arr
1033
1034 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035 def test_rawarray(self):
1036 self.test_array(raw=True)
1037
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001038 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001040 arr1 = self.Array('i', list(range(10)))
1041 lock1 = arr1.get_lock()
1042 obj1 = arr1.get_obj()
1043
1044 arr2 = self.Array('i', list(range(10)), lock=None)
1045 lock2 = arr2.get_lock()
1046 obj2 = arr2.get_obj()
1047
1048 lock = self.Lock()
1049 arr3 = self.Array('i', list(range(10)), lock=lock)
1050 lock3 = arr3.get_lock()
1051 obj3 = arr3.get_obj()
1052 self.assertEqual(lock, lock3)
1053
Jesse Nollerb0516a62009-01-18 03:11:38 +00001054 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001055 self.assertFalse(hasattr(arr4, 'get_lock'))
1056 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001057 self.assertRaises(AttributeError,
1058 self.Array, 'i', range(10), lock='notalock')
1059
1060 arr5 = self.RawArray('i', range(10))
1061 self.assertFalse(hasattr(arr5, 'get_lock'))
1062 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001063
1064#
1065#
1066#
1067
1068class _TestContainers(BaseTestCase):
1069
1070 ALLOWED_TYPES = ('manager',)
1071
1072 def test_list(self):
1073 a = self.list(list(range(10)))
1074 self.assertEqual(a[:], list(range(10)))
1075
1076 b = self.list()
1077 self.assertEqual(b[:], [])
1078
1079 b.extend(list(range(5)))
1080 self.assertEqual(b[:], list(range(5)))
1081
1082 self.assertEqual(b[2], 2)
1083 self.assertEqual(b[2:10], [2,3,4])
1084
1085 b *= 2
1086 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1087
1088 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1089
1090 self.assertEqual(a[:], list(range(10)))
1091
1092 d = [a, b]
1093 e = self.list(d)
1094 self.assertEqual(
1095 e[:],
1096 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1097 )
1098
1099 f = self.list([a])
1100 a.append('hello')
1101 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1102
1103 def test_dict(self):
1104 d = self.dict()
1105 indices = list(range(65, 70))
1106 for i in indices:
1107 d[i] = chr(i)
1108 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1109 self.assertEqual(sorted(d.keys()), indices)
1110 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1111 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1112
1113 def test_namespace(self):
1114 n = self.Namespace()
1115 n.name = 'Bob'
1116 n.job = 'Builder'
1117 n._hidden = 'hidden'
1118 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1119 del n.job
1120 self.assertEqual(str(n), "Namespace(name='Bob')")
1121 self.assertTrue(hasattr(n, 'name'))
1122 self.assertTrue(not hasattr(n, 'job'))
1123
1124#
1125#
1126#
1127
1128def sqr(x, wait=0.0):
1129 time.sleep(wait)
1130 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001131
Benjamin Petersone711caf2008-06-11 16:44:04 +00001132class _TestPool(BaseTestCase):
1133
1134 def test_apply(self):
1135 papply = self.pool.apply
1136 self.assertEqual(papply(sqr, (5,)), sqr(5))
1137 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1138
1139 def test_map(self):
1140 pmap = self.pool.map
1141 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1142 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1143 list(map(sqr, list(range(100)))))
1144
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001145 def test_map_chunksize(self):
1146 try:
1147 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1148 except multiprocessing.TimeoutError:
1149 self.fail("pool.map_async with chunksize stalled on null list")
1150
Benjamin Petersone711caf2008-06-11 16:44:04 +00001151 def test_async(self):
1152 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1153 get = TimingWrapper(res.get)
1154 self.assertEqual(get(), 49)
1155 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1156
1157 def test_async_timeout(self):
1158 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1159 get = TimingWrapper(res.get)
1160 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1161 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1162
1163 def test_imap(self):
1164 it = self.pool.imap(sqr, list(range(10)))
1165 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1166
1167 it = self.pool.imap(sqr, list(range(10)))
1168 for i in range(10):
1169 self.assertEqual(next(it), i*i)
1170 self.assertRaises(StopIteration, it.__next__)
1171
1172 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1173 for i in range(1000):
1174 self.assertEqual(next(it), i*i)
1175 self.assertRaises(StopIteration, it.__next__)
1176
1177 def test_imap_unordered(self):
1178 it = self.pool.imap_unordered(sqr, list(range(1000)))
1179 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1180
1181 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1182 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1183
1184 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001185 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1186 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1187
Benjamin Petersone711caf2008-06-11 16:44:04 +00001188 p = multiprocessing.Pool(3)
1189 self.assertEqual(3, len(p._pool))
1190 p.close()
1191 p.join()
1192
1193 def test_terminate(self):
1194 if self.TYPE == 'manager':
1195 # On Unix a forked process increfs each shared object to
1196 # which its parent process held a reference. If the
1197 # forked process gets terminated then there is likely to
1198 # be a reference leak. So to prevent
1199 # _TestZZZNumberOfObjects from failing we skip this test
1200 # when using a manager.
1201 return
1202
1203 result = self.pool.map_async(
1204 time.sleep, [0.1 for i in range(10000)], chunksize=1
1205 )
1206 self.pool.terminate()
1207 join = TimingWrapper(self.pool.join)
1208 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001209 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001210
Richard Oudkerke41682b2012-06-06 19:04:57 +01001211 def test_empty_iterable(self):
1212 # See Issue 12157
1213 p = self.Pool(1)
1214
1215 self.assertEqual(p.map(sqr, []), [])
1216 self.assertEqual(list(p.imap(sqr, [])), [])
1217 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1218 self.assertEqual(p.map_async(sqr, []).get(), [])
1219
1220 p.close()
1221 p.join()
1222
Ask Solem2afcbf22010-11-09 20:55:52 +00001223def raising():
1224 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001225
Ask Solem2afcbf22010-11-09 20:55:52 +00001226def unpickleable_result():
1227 return lambda: 42
1228
1229class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001230 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001231
1232 def test_async_error_callback(self):
1233 p = multiprocessing.Pool(2)
1234
1235 scratchpad = [None]
1236 def errback(exc):
1237 scratchpad[0] = exc
1238
1239 res = p.apply_async(raising, error_callback=errback)
1240 self.assertRaises(KeyError, res.get)
1241 self.assertTrue(scratchpad[0])
1242 self.assertIsInstance(scratchpad[0], KeyError)
1243
1244 p.close()
1245 p.join()
1246
1247 def test_unpickleable_result(self):
1248 from multiprocessing.pool import MaybeEncodingError
1249 p = multiprocessing.Pool(2)
1250
1251 # Make sure we don't lose pool processes because of encoding errors.
1252 for iteration in range(20):
1253
1254 scratchpad = [None]
1255 def errback(exc):
1256 scratchpad[0] = exc
1257
1258 res = p.apply_async(unpickleable_result, error_callback=errback)
1259 self.assertRaises(MaybeEncodingError, res.get)
1260 wrapped = scratchpad[0]
1261 self.assertTrue(wrapped)
1262 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1263 self.assertIsNotNone(wrapped.exc)
1264 self.assertIsNotNone(wrapped.value)
1265
1266 p.close()
1267 p.join()
1268
1269class _TestPoolWorkerLifetime(BaseTestCase):
1270 ALLOWED_TYPES = ('processes', )
1271
Jesse Noller1f0b6582010-01-27 03:36:01 +00001272 def test_pool_worker_lifetime(self):
1273 p = multiprocessing.Pool(3, maxtasksperchild=10)
1274 self.assertEqual(3, len(p._pool))
1275 origworkerpids = [w.pid for w in p._pool]
1276 # Run many tasks so each worker gets replaced (hopefully)
1277 results = []
1278 for i in range(100):
1279 results.append(p.apply_async(sqr, (i, )))
1280 # Fetch the results and verify we got the right answers,
1281 # also ensuring all the tasks have completed.
1282 for (j, res) in enumerate(results):
1283 self.assertEqual(res.get(), sqr(j))
1284 # Refill the pool
1285 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001286 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001287 # (countdown * DELTA = 5 seconds max startup process time)
1288 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001289 while countdown and not all(w.is_alive() for w in p._pool):
1290 countdown -= 1
1291 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001292 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001293 # All pids should be assigned. See issue #7805.
1294 self.assertNotIn(None, origworkerpids)
1295 self.assertNotIn(None, finalworkerpids)
1296 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001297 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1298 p.close()
1299 p.join()
1300
Charles-François Natalif8859e12011-10-24 18:45:29 +02001301 def test_pool_worker_lifetime_early_close(self):
1302 # Issue #10332: closing a pool whose workers have limited lifetimes
1303 # before all the tasks completed would make join() hang.
1304 p = multiprocessing.Pool(3, maxtasksperchild=1)
1305 results = []
1306 for i in range(6):
1307 results.append(p.apply_async(sqr, (i, 0.3)))
1308 p.close()
1309 p.join()
1310 # check the results
1311 for (j, res) in enumerate(results):
1312 self.assertEqual(res.get(), sqr(j))
1313
1314
Benjamin Petersone711caf2008-06-11 16:44:04 +00001315#
1316# Test that manager has expected number of shared objects left
1317#
1318
1319class _TestZZZNumberOfObjects(BaseTestCase):
1320 # Because test cases are sorted alphabetically, this one will get
1321 # run after all the other tests for the manager. It tests that
1322 # there have been no "reference leaks" for the manager's shared
1323 # objects. Note the comment in _TestPool.test_terminate().
1324 ALLOWED_TYPES = ('manager',)
1325
1326 def test_number_of_objects(self):
1327 EXPECTED_NUMBER = 1 # the pool object is still alive
1328 multiprocessing.active_children() # discard dead process objs
1329 gc.collect() # do garbage collection
1330 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001331 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001332 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001333 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001334 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001335
1336 self.assertEqual(refs, EXPECTED_NUMBER)
1337
1338#
1339# Test of creating a customized manager class
1340#
1341
1342from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1343
1344class FooBar(object):
1345 def f(self):
1346 return 'f()'
1347 def g(self):
1348 raise ValueError
1349 def _h(self):
1350 return '_h()'
1351
1352def baz():
1353 for i in range(10):
1354 yield i*i
1355
1356class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001357 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001358 def __iter__(self):
1359 return self
1360 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001361 return self._callmethod('__next__')
1362
1363class MyManager(BaseManager):
1364 pass
1365
1366MyManager.register('Foo', callable=FooBar)
1367MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1368MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1369
1370
1371class _TestMyManager(BaseTestCase):
1372
1373 ALLOWED_TYPES = ('manager',)
1374
1375 def test_mymanager(self):
1376 manager = MyManager()
1377 manager.start()
1378
1379 foo = manager.Foo()
1380 bar = manager.Bar()
1381 baz = manager.baz()
1382
1383 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1384 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1385
1386 self.assertEqual(foo_methods, ['f', 'g'])
1387 self.assertEqual(bar_methods, ['f', '_h'])
1388
1389 self.assertEqual(foo.f(), 'f()')
1390 self.assertRaises(ValueError, foo.g)
1391 self.assertEqual(foo._callmethod('f'), 'f()')
1392 self.assertRaises(RemoteError, foo._callmethod, '_h')
1393
1394 self.assertEqual(bar.f(), 'f()')
1395 self.assertEqual(bar._h(), '_h()')
1396 self.assertEqual(bar._callmethod('f'), 'f()')
1397 self.assertEqual(bar._callmethod('_h'), '_h()')
1398
1399 self.assertEqual(list(baz), [i*i for i in range(10)])
1400
1401 manager.shutdown()
1402
1403#
1404# Test of connecting to a remote server and using xmlrpclib for serialization
1405#
1406
1407_queue = pyqueue.Queue()
1408def get_queue():
1409 return _queue
1410
1411class QueueManager(BaseManager):
1412 '''manager class used by server process'''
1413QueueManager.register('get_queue', callable=get_queue)
1414
1415class QueueManager2(BaseManager):
1416 '''manager class which specifies the same interface as QueueManager'''
1417QueueManager2.register('get_queue')
1418
1419
1420SERIALIZER = 'xmlrpclib'
1421
1422class _TestRemoteManager(BaseTestCase):
1423
1424 ALLOWED_TYPES = ('manager',)
1425
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001426 @classmethod
1427 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001428 manager = QueueManager2(
1429 address=address, authkey=authkey, serializer=SERIALIZER
1430 )
1431 manager.connect()
1432 queue = manager.get_queue()
1433 queue.put(('hello world', None, True, 2.25))
1434
1435 def test_remote(self):
1436 authkey = os.urandom(32)
1437
1438 manager = QueueManager(
1439 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1440 )
1441 manager.start()
1442
1443 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001444 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001445 p.start()
1446
1447 manager2 = QueueManager2(
1448 address=manager.address, authkey=authkey, serializer=SERIALIZER
1449 )
1450 manager2.connect()
1451 queue = manager2.get_queue()
1452
1453 # Note that xmlrpclib will deserialize object as a list not a tuple
1454 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1455
1456 # Because we are using xmlrpclib for serialization instead of
1457 # pickle this will cause a serialization error.
1458 self.assertRaises(Exception, queue.put, time.sleep)
1459
1460 # Make queue finalizer run before the server is stopped
1461 del queue
1462 manager.shutdown()
1463
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001464class _TestManagerRestart(BaseTestCase):
1465
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001466 @classmethod
1467 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001468 manager = QueueManager(
1469 address=address, authkey=authkey, serializer=SERIALIZER)
1470 manager.connect()
1471 queue = manager.get_queue()
1472 queue.put('hello world')
1473
1474 def test_rapid_restart(self):
1475 authkey = os.urandom(32)
1476 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001477 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001478 srvr = manager.get_server()
1479 addr = srvr.address
1480 # Close the connection.Listener socket which gets opened as a part
1481 # of manager.get_server(). It's not needed for the test.
1482 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001483 manager.start()
1484
1485 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001486 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001487 p.start()
1488 queue = manager.get_queue()
1489 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001490 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001491 manager.shutdown()
1492 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001493 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001494 try:
1495 manager.start()
1496 except IOError as e:
1497 if e.errno != errno.EADDRINUSE:
1498 raise
1499 # Retry after some time, in case the old socket was lingering
1500 # (sporadic failure on buildbots)
1501 time.sleep(1.0)
1502 manager = QueueManager(
1503 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001504 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001505
Benjamin Petersone711caf2008-06-11 16:44:04 +00001506#
1507#
1508#
1509
1510SENTINEL = latin('')
1511
1512class _TestConnection(BaseTestCase):
1513
1514 ALLOWED_TYPES = ('processes', 'threads')
1515
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001516 @classmethod
1517 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001518 for msg in iter(conn.recv_bytes, SENTINEL):
1519 conn.send_bytes(msg)
1520 conn.close()
1521
1522 def test_connection(self):
1523 conn, child_conn = self.Pipe()
1524
1525 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001526 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001527 p.start()
1528
1529 seq = [1, 2.25, None]
1530 msg = latin('hello world')
1531 longmsg = msg * 10
1532 arr = array.array('i', list(range(4)))
1533
1534 if self.TYPE == 'processes':
1535 self.assertEqual(type(conn.fileno()), int)
1536
1537 self.assertEqual(conn.send(seq), None)
1538 self.assertEqual(conn.recv(), seq)
1539
1540 self.assertEqual(conn.send_bytes(msg), None)
1541 self.assertEqual(conn.recv_bytes(), msg)
1542
1543 if self.TYPE == 'processes':
1544 buffer = array.array('i', [0]*10)
1545 expected = list(arr) + [0] * (10 - len(arr))
1546 self.assertEqual(conn.send_bytes(arr), None)
1547 self.assertEqual(conn.recv_bytes_into(buffer),
1548 len(arr) * buffer.itemsize)
1549 self.assertEqual(list(buffer), expected)
1550
1551 buffer = array.array('i', [0]*10)
1552 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1553 self.assertEqual(conn.send_bytes(arr), None)
1554 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1555 len(arr) * buffer.itemsize)
1556 self.assertEqual(list(buffer), expected)
1557
1558 buffer = bytearray(latin(' ' * 40))
1559 self.assertEqual(conn.send_bytes(longmsg), None)
1560 try:
1561 res = conn.recv_bytes_into(buffer)
1562 except multiprocessing.BufferTooShort as e:
1563 self.assertEqual(e.args, (longmsg,))
1564 else:
1565 self.fail('expected BufferTooShort, got %s' % res)
1566
1567 poll = TimingWrapper(conn.poll)
1568
1569 self.assertEqual(poll(), False)
1570 self.assertTimingAlmostEqual(poll.elapsed, 0)
1571
1572 self.assertEqual(poll(TIMEOUT1), False)
1573 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1574
1575 conn.send(None)
1576
1577 self.assertEqual(poll(TIMEOUT1), True)
1578 self.assertTimingAlmostEqual(poll.elapsed, 0)
1579
1580 self.assertEqual(conn.recv(), None)
1581
1582 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1583 conn.send_bytes(really_big_msg)
1584 self.assertEqual(conn.recv_bytes(), really_big_msg)
1585
1586 conn.send_bytes(SENTINEL) # tell child to quit
1587 child_conn.close()
1588
1589 if self.TYPE == 'processes':
1590 self.assertEqual(conn.readable, True)
1591 self.assertEqual(conn.writable, True)
1592 self.assertRaises(EOFError, conn.recv)
1593 self.assertRaises(EOFError, conn.recv_bytes)
1594
1595 p.join()
1596
1597 def test_duplex_false(self):
1598 reader, writer = self.Pipe(duplex=False)
1599 self.assertEqual(writer.send(1), None)
1600 self.assertEqual(reader.recv(), 1)
1601 if self.TYPE == 'processes':
1602 self.assertEqual(reader.readable, True)
1603 self.assertEqual(reader.writable, False)
1604 self.assertEqual(writer.readable, False)
1605 self.assertEqual(writer.writable, True)
1606 self.assertRaises(IOError, reader.send, 2)
1607 self.assertRaises(IOError, writer.recv)
1608 self.assertRaises(IOError, writer.poll)
1609
1610 def test_spawn_close(self):
1611 # We test that a pipe connection can be closed by parent
1612 # process immediately after child is spawned. On Windows this
1613 # would have sometimes failed on old versions because
1614 # child_conn would be closed before the child got a chance to
1615 # duplicate it.
1616 conn, child_conn = self.Pipe()
1617
1618 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001619 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001620 p.start()
1621 child_conn.close() # this might complete before child initializes
1622
1623 msg = latin('hello')
1624 conn.send_bytes(msg)
1625 self.assertEqual(conn.recv_bytes(), msg)
1626
1627 conn.send_bytes(SENTINEL)
1628 conn.close()
1629 p.join()
1630
1631 def test_sendbytes(self):
1632 if self.TYPE != 'processes':
1633 return
1634
1635 msg = latin('abcdefghijklmnopqrstuvwxyz')
1636 a, b = self.Pipe()
1637
1638 a.send_bytes(msg)
1639 self.assertEqual(b.recv_bytes(), msg)
1640
1641 a.send_bytes(msg, 5)
1642 self.assertEqual(b.recv_bytes(), msg[5:])
1643
1644 a.send_bytes(msg, 7, 8)
1645 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1646
1647 a.send_bytes(msg, 26)
1648 self.assertEqual(b.recv_bytes(), latin(''))
1649
1650 a.send_bytes(msg, 26, 0)
1651 self.assertEqual(b.recv_bytes(), latin(''))
1652
1653 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1654
1655 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1656
1657 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1658
1659 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1660
1661 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1662
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001663 @classmethod
1664 def _is_fd_assigned(cls, fd):
1665 try:
1666 os.fstat(fd)
1667 except OSError as e:
1668 if e.errno == errno.EBADF:
1669 return False
1670 raise
1671 else:
1672 return True
1673
1674 @classmethod
1675 def _writefd(cls, conn, data, create_dummy_fds=False):
1676 if create_dummy_fds:
1677 for i in range(0, 256):
1678 if not cls._is_fd_assigned(i):
1679 os.dup2(conn.fileno(), i)
1680 fd = reduction.recv_handle(conn)
1681 if msvcrt:
1682 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1683 os.write(fd, data)
1684 os.close(fd)
1685
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001686 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001687 def test_fd_transfer(self):
1688 if self.TYPE != 'processes':
1689 self.skipTest("only makes sense with processes")
1690 conn, child_conn = self.Pipe(duplex=True)
1691
1692 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001693 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001694 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001695 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001696 with open(test.support.TESTFN, "wb") as f:
1697 fd = f.fileno()
1698 if msvcrt:
1699 fd = msvcrt.get_osfhandle(fd)
1700 reduction.send_handle(conn, fd, p.pid)
1701 p.join()
1702 with open(test.support.TESTFN, "rb") as f:
1703 self.assertEqual(f.read(), b"foo")
1704
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001705 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001706 @unittest.skipIf(sys.platform == "win32",
1707 "test semantics don't make sense on Windows")
1708 @unittest.skipIf(MAXFD <= 256,
1709 "largest assignable fd number is too small")
1710 @unittest.skipUnless(hasattr(os, "dup2"),
1711 "test needs os.dup2()")
1712 def test_large_fd_transfer(self):
1713 # With fd > 256 (issue #11657)
1714 if self.TYPE != 'processes':
1715 self.skipTest("only makes sense with processes")
1716 conn, child_conn = self.Pipe(duplex=True)
1717
1718 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001719 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001720 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001721 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001722 with open(test.support.TESTFN, "wb") as f:
1723 fd = f.fileno()
1724 for newfd in range(256, MAXFD):
1725 if not self._is_fd_assigned(newfd):
1726 break
1727 else:
1728 self.fail("could not find an unassigned large file descriptor")
1729 os.dup2(fd, newfd)
1730 try:
1731 reduction.send_handle(conn, newfd, p.pid)
1732 finally:
1733 os.close(newfd)
1734 p.join()
1735 with open(test.support.TESTFN, "rb") as f:
1736 self.assertEqual(f.read(), b"bar")
1737
Jesus Cea4507e642011-09-21 03:53:25 +02001738 @classmethod
1739 def _send_data_without_fd(self, conn):
1740 os.write(conn.fileno(), b"\0")
1741
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001742 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001743 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1744 def test_missing_fd_transfer(self):
1745 # Check that exception is raised when received data is not
1746 # accompanied by a file descriptor in ancillary data.
1747 if self.TYPE != 'processes':
1748 self.skipTest("only makes sense with processes")
1749 conn, child_conn = self.Pipe(duplex=True)
1750
1751 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1752 p.daemon = True
1753 p.start()
1754 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1755 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001756
Benjamin Petersone711caf2008-06-11 16:44:04 +00001757class _TestListenerClient(BaseTestCase):
1758
1759 ALLOWED_TYPES = ('processes', 'threads')
1760
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001761 @classmethod
1762 def _test(cls, address):
1763 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001764 conn.send('hello')
1765 conn.close()
1766
1767 def test_listener_client(self):
1768 for family in self.connection.families:
1769 l = self.connection.Listener(family=family)
1770 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001771 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001772 p.start()
1773 conn = l.accept()
1774 self.assertEqual(conn.recv(), 'hello')
1775 p.join()
1776 l.close()
Richard Oudkerk7ef909c2012-05-05 20:41:23 +01001777
1778 def test_issue14725(self):
1779 l = self.connection.Listener()
1780 p = self.Process(target=self._test, args=(l.address,))
1781 p.daemon = True
1782 p.start()
1783 time.sleep(1)
1784 # On Windows the client process should by now have connected,
1785 # written data and closed the pipe handle by now. This causes
1786 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1787 # 14725.
1788 conn = l.accept()
1789 self.assertEqual(conn.recv(), 'hello')
1790 conn.close()
1791 p.join()
1792 l.close()
1793
Benjamin Petersone711caf2008-06-11 16:44:04 +00001794#
1795# Test of sending connection and socket objects between processes
1796#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001797"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001798class _TestPicklingConnections(BaseTestCase):
1799
1800 ALLOWED_TYPES = ('processes',)
1801
1802 def _listener(self, conn, families):
1803 for fam in families:
1804 l = self.connection.Listener(family=fam)
1805 conn.send(l.address)
1806 new_conn = l.accept()
1807 conn.send(new_conn)
1808
1809 if self.TYPE == 'processes':
1810 l = socket.socket()
1811 l.bind(('localhost', 0))
1812 conn.send(l.getsockname())
1813 l.listen(1)
1814 new_conn, addr = l.accept()
1815 conn.send(new_conn)
1816
1817 conn.recv()
1818
1819 def _remote(self, conn):
1820 for (address, msg) in iter(conn.recv, None):
1821 client = self.connection.Client(address)
1822 client.send(msg.upper())
1823 client.close()
1824
1825 if self.TYPE == 'processes':
1826 address, msg = conn.recv()
1827 client = socket.socket()
1828 client.connect(address)
1829 client.sendall(msg.upper())
1830 client.close()
1831
1832 conn.close()
1833
1834 def test_pickling(self):
1835 try:
1836 multiprocessing.allow_connection_pickling()
1837 except ImportError:
1838 return
1839
1840 families = self.connection.families
1841
1842 lconn, lconn0 = self.Pipe()
1843 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001844 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001845 lp.start()
1846 lconn0.close()
1847
1848 rconn, rconn0 = self.Pipe()
1849 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001850 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001851 rp.start()
1852 rconn0.close()
1853
1854 for fam in families:
1855 msg = ('This connection uses family %s' % fam).encode('ascii')
1856 address = lconn.recv()
1857 rconn.send((address, msg))
1858 new_conn = lconn.recv()
1859 self.assertEqual(new_conn.recv(), msg.upper())
1860
1861 rconn.send(None)
1862
1863 if self.TYPE == 'processes':
1864 msg = latin('This connection uses a normal socket')
1865 address = lconn.recv()
1866 rconn.send((address, msg))
1867 if hasattr(socket, 'fromfd'):
1868 new_conn = lconn.recv()
1869 self.assertEqual(new_conn.recv(100), msg.upper())
1870 else:
1871 # XXX On Windows with Py2.6 need to backport fromfd()
1872 discard = lconn.recv_bytes()
1873
1874 lconn.send(None)
1875
1876 rconn.close()
1877 lconn.close()
1878
1879 lp.join()
1880 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001881"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001882#
1883#
1884#
1885
1886class _TestHeap(BaseTestCase):
1887
1888 ALLOWED_TYPES = ('processes',)
1889
1890 def test_heap(self):
1891 iterations = 5000
1892 maxblocks = 50
1893 blocks = []
1894
1895 # create and destroy lots of blocks of different sizes
1896 for i in range(iterations):
1897 size = int(random.lognormvariate(0, 1) * 1000)
1898 b = multiprocessing.heap.BufferWrapper(size)
1899 blocks.append(b)
1900 if len(blocks) > maxblocks:
1901 i = random.randrange(maxblocks)
1902 del blocks[i]
1903
1904 # get the heap object
1905 heap = multiprocessing.heap.BufferWrapper._heap
1906
1907 # verify the state of the heap
1908 all = []
1909 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001910 heap._lock.acquire()
1911 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001912 for L in list(heap._len_to_seq.values()):
1913 for arena, start, stop in L:
1914 all.append((heap._arenas.index(arena), start, stop,
1915 stop-start, 'free'))
1916 for arena, start, stop in heap._allocated_blocks:
1917 all.append((heap._arenas.index(arena), start, stop,
1918 stop-start, 'occupied'))
1919 occupied += (stop-start)
1920
1921 all.sort()
1922
1923 for i in range(len(all)-1):
1924 (arena, start, stop) = all[i][:3]
1925 (narena, nstart, nstop) = all[i+1][:3]
1926 self.assertTrue((arena != narena and nstart == 0) or
1927 (stop == nstart))
1928
Charles-François Natali778db492011-07-02 14:35:49 +02001929 def test_free_from_gc(self):
1930 # Check that freeing of blocks by the garbage collector doesn't deadlock
1931 # (issue #12352).
1932 # Make sure the GC is enabled, and set lower collection thresholds to
1933 # make collections more frequent (and increase the probability of
1934 # deadlock).
1935 if not gc.isenabled():
1936 gc.enable()
1937 self.addCleanup(gc.disable)
1938 thresholds = gc.get_threshold()
1939 self.addCleanup(gc.set_threshold, *thresholds)
1940 gc.set_threshold(10)
1941
1942 # perform numerous block allocations, with cyclic references to make
1943 # sure objects are collected asynchronously by the gc
1944 for i in range(5000):
1945 a = multiprocessing.heap.BufferWrapper(1)
1946 b = multiprocessing.heap.BufferWrapper(1)
1947 # circular references
1948 a.buddy = b
1949 b.buddy = a
1950
Benjamin Petersone711caf2008-06-11 16:44:04 +00001951#
1952#
1953#
1954
Benjamin Petersone711caf2008-06-11 16:44:04 +00001955class _Foo(Structure):
1956 _fields_ = [
1957 ('x', c_int),
1958 ('y', c_double)
1959 ]
1960
1961class _TestSharedCTypes(BaseTestCase):
1962
1963 ALLOWED_TYPES = ('processes',)
1964
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001965 def setUp(self):
1966 if not HAS_SHAREDCTYPES:
1967 self.skipTest("requires multiprocessing.sharedctypes")
1968
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001969 @classmethod
1970 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001971 x.value *= 2
1972 y.value *= 2
1973 foo.x *= 2
1974 foo.y *= 2
1975 string.value *= 2
1976 for i in range(len(arr)):
1977 arr[i] *= 2
1978
1979 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001980 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001981 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001982 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001983 arr = self.Array('d', list(range(10)), lock=lock)
1984 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001985 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001986
1987 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001988 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001989 p.start()
1990 p.join()
1991
1992 self.assertEqual(x.value, 14)
1993 self.assertAlmostEqual(y.value, 2.0/3.0)
1994 self.assertEqual(foo.x, 6)
1995 self.assertAlmostEqual(foo.y, 4.0)
1996 for i in range(10):
1997 self.assertAlmostEqual(arr[i], i*2)
1998 self.assertEqual(string.value, latin('hellohello'))
1999
2000 def test_synchronize(self):
2001 self.test_sharedctypes(lock=True)
2002
2003 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002004 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002005 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002006 foo.x = 0
2007 foo.y = 0
2008 self.assertEqual(bar.x, 2)
2009 self.assertAlmostEqual(bar.y, 5.0)
2010
2011#
2012#
2013#
2014
2015class _TestFinalize(BaseTestCase):
2016
2017 ALLOWED_TYPES = ('processes',)
2018
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002019 @classmethod
2020 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002021 class Foo(object):
2022 pass
2023
2024 a = Foo()
2025 util.Finalize(a, conn.send, args=('a',))
2026 del a # triggers callback for a
2027
2028 b = Foo()
2029 close_b = util.Finalize(b, conn.send, args=('b',))
2030 close_b() # triggers callback for b
2031 close_b() # does nothing because callback has already been called
2032 del b # does nothing because callback has already been called
2033
2034 c = Foo()
2035 util.Finalize(c, conn.send, args=('c',))
2036
2037 d10 = Foo()
2038 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2039
2040 d01 = Foo()
2041 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2042 d02 = Foo()
2043 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2044 d03 = Foo()
2045 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2046
2047 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2048
2049 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2050
Ezio Melotti13925002011-03-16 11:05:33 +02002051 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002052 # garbage collecting locals
2053 util._exit_function()
2054 conn.close()
2055 os._exit(0)
2056
2057 def test_finalize(self):
2058 conn, child_conn = self.Pipe()
2059
2060 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002061 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002062 p.start()
2063 p.join()
2064
2065 result = [obj for obj in iter(conn.recv, 'STOP')]
2066 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2067
2068#
2069# Test that from ... import * works for each module
2070#
2071
2072class _TestImportStar(BaseTestCase):
2073
2074 ALLOWED_TYPES = ('processes',)
2075
2076 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002077 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002078 'multiprocessing', 'multiprocessing.connection',
2079 'multiprocessing.heap', 'multiprocessing.managers',
2080 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002081 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002082 ]
2083
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002084 if HAS_REDUCTION:
2085 modules.append('multiprocessing.reduction')
2086
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002087 if c_int is not None:
2088 # This module requires _ctypes
2089 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002090
2091 for name in modules:
2092 __import__(name)
2093 mod = sys.modules[name]
2094
2095 for attr in getattr(mod, '__all__', ()):
2096 self.assertTrue(
2097 hasattr(mod, attr),
2098 '%r does not have attribute %r' % (mod, attr)
2099 )
2100
2101#
2102# Quick test that logging works -- does not test logging output
2103#
2104
2105class _TestLogging(BaseTestCase):
2106
2107 ALLOWED_TYPES = ('processes',)
2108
2109 def test_enable_logging(self):
2110 logger = multiprocessing.get_logger()
2111 logger.setLevel(util.SUBWARNING)
2112 self.assertTrue(logger is not None)
2113 logger.debug('this will not be printed')
2114 logger.info('nor will this')
2115 logger.setLevel(LOG_LEVEL)
2116
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002117 @classmethod
2118 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002119 logger = multiprocessing.get_logger()
2120 conn.send(logger.getEffectiveLevel())
2121
2122 def test_level(self):
2123 LEVEL1 = 32
2124 LEVEL2 = 37
2125
2126 logger = multiprocessing.get_logger()
2127 root_logger = logging.getLogger()
2128 root_level = root_logger.level
2129
2130 reader, writer = multiprocessing.Pipe(duplex=False)
2131
2132 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002133 p = self.Process(target=self._test_level, args=(writer,))
2134 p.daemon = True
2135 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002136 self.assertEqual(LEVEL1, reader.recv())
2137
2138 logger.setLevel(logging.NOTSET)
2139 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002140 p = self.Process(target=self._test_level, args=(writer,))
2141 p.daemon = True
2142 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002143 self.assertEqual(LEVEL2, reader.recv())
2144
2145 root_logger.setLevel(root_level)
2146 logger.setLevel(level=LOG_LEVEL)
2147
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002148
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002149# class _TestLoggingProcessName(BaseTestCase):
2150#
2151# def handle(self, record):
2152# assert record.processName == multiprocessing.current_process().name
2153# self.__handled = True
2154#
2155# def test_logging(self):
2156# handler = logging.Handler()
2157# handler.handle = self.handle
2158# self.__handled = False
2159# # Bypass getLogger() and side-effects
2160# logger = logging.getLoggerClass()(
2161# 'multiprocessing.test.TestLoggingProcessName')
2162# logger.addHandler(handler)
2163# logger.propagate = False
2164#
2165# logger.warn('foo')
2166# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002167
Benjamin Petersone711caf2008-06-11 16:44:04 +00002168#
Jesse Noller6214edd2009-01-19 16:23:53 +00002169# Test to verify handle verification, see issue 3321
2170#
2171
2172class TestInvalidHandle(unittest.TestCase):
2173
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002174 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002175 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002176 conn = _multiprocessing.Connection(44977608)
2177 self.assertRaises(IOError, conn.poll)
2178 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002179
Jesse Noller6214edd2009-01-19 16:23:53 +00002180#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002181# Functions used to create test cases from the base ones in this module
2182#
2183
2184def get_attributes(Source, names):
2185 d = {}
2186 for name in names:
2187 obj = getattr(Source, name)
2188 if type(obj) == type(get_attributes):
2189 obj = staticmethod(obj)
2190 d[name] = obj
2191 return d
2192
2193def create_test_cases(Mixin, type):
2194 result = {}
2195 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002196 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002197
2198 for name in list(glob.keys()):
2199 if name.startswith('_Test'):
2200 base = glob[name]
2201 if type in base.ALLOWED_TYPES:
2202 newname = 'With' + Type + name[1:]
2203 class Temp(base, unittest.TestCase, Mixin):
2204 pass
2205 result[newname] = Temp
2206 Temp.__name__ = newname
2207 Temp.__module__ = Mixin.__module__
2208 return result
2209
2210#
2211# Create test cases
2212#
2213
2214class ProcessesMixin(object):
2215 TYPE = 'processes'
2216 Process = multiprocessing.Process
2217 locals().update(get_attributes(multiprocessing, (
2218 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2219 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2220 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002221 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002222 )))
2223
2224testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2225globals().update(testcases_processes)
2226
2227
2228class ManagerMixin(object):
2229 TYPE = 'manager'
2230 Process = multiprocessing.Process
2231 manager = object.__new__(multiprocessing.managers.SyncManager)
2232 locals().update(get_attributes(manager, (
2233 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2234 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002235 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002236 )))
2237
2238testcases_manager = create_test_cases(ManagerMixin, type='manager')
2239globals().update(testcases_manager)
2240
2241
2242class ThreadsMixin(object):
2243 TYPE = 'threads'
2244 Process = multiprocessing.dummy.Process
2245 locals().update(get_attributes(multiprocessing.dummy, (
2246 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2247 'Condition', 'Event', 'Value', 'Array', 'current_process',
2248 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002249 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002250 )))
2251
2252testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2253globals().update(testcases_threads)
2254
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002255class OtherTest(unittest.TestCase):
2256 # TODO: add more tests for deliver/answer challenge.
2257 def test_deliver_challenge_auth_failure(self):
2258 class _FakeConnection(object):
2259 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002260 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002261 def send_bytes(self, data):
2262 pass
2263 self.assertRaises(multiprocessing.AuthenticationError,
2264 multiprocessing.connection.deliver_challenge,
2265 _FakeConnection(), b'abc')
2266
2267 def test_answer_challenge_auth_failure(self):
2268 class _FakeConnection(object):
2269 def __init__(self):
2270 self.count = 0
2271 def recv_bytes(self, size):
2272 self.count += 1
2273 if self.count == 1:
2274 return multiprocessing.connection.CHALLENGE
2275 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002276 return b'something bogus'
2277 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002278 def send_bytes(self, data):
2279 pass
2280 self.assertRaises(multiprocessing.AuthenticationError,
2281 multiprocessing.connection.answer_challenge,
2282 _FakeConnection(), b'abc')
2283
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002284#
2285# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2286#
2287
2288def initializer(ns):
2289 ns.test += 1
2290
2291class TestInitializers(unittest.TestCase):
2292 def setUp(self):
2293 self.mgr = multiprocessing.Manager()
2294 self.ns = self.mgr.Namespace()
2295 self.ns.test = 0
2296
2297 def tearDown(self):
2298 self.mgr.shutdown()
2299
2300 def test_manager_initializer(self):
2301 m = multiprocessing.managers.SyncManager()
2302 self.assertRaises(TypeError, m.start, 1)
2303 m.start(initializer, (self.ns,))
2304 self.assertEqual(self.ns.test, 1)
2305 m.shutdown()
2306
2307 def test_pool_initializer(self):
2308 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2309 p = multiprocessing.Pool(1, initializer, (self.ns,))
2310 p.close()
2311 p.join()
2312 self.assertEqual(self.ns.test, 1)
2313
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002314#
2315# Issue 5155, 5313, 5331: Test process in processes
2316# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2317#
2318
2319def _ThisSubProcess(q):
2320 try:
2321 item = q.get(block=False)
2322 except pyqueue.Empty:
2323 pass
2324
2325def _TestProcess(q):
2326 queue = multiprocessing.Queue()
2327 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002328 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002329 subProc.start()
2330 subProc.join()
2331
2332def _afunc(x):
2333 return x*x
2334
2335def pool_in_process():
2336 pool = multiprocessing.Pool(processes=4)
2337 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2338
2339class _file_like(object):
2340 def __init__(self, delegate):
2341 self._delegate = delegate
2342 self._pid = None
2343
2344 @property
2345 def cache(self):
2346 pid = os.getpid()
2347 # There are no race conditions since fork keeps only the running thread
2348 if pid != self._pid:
2349 self._pid = pid
2350 self._cache = []
2351 return self._cache
2352
2353 def write(self, data):
2354 self.cache.append(data)
2355
2356 def flush(self):
2357 self._delegate.write(''.join(self.cache))
2358 self._cache = []
2359
2360class TestStdinBadfiledescriptor(unittest.TestCase):
2361
2362 def test_queue_in_process(self):
2363 queue = multiprocessing.Queue()
2364 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2365 proc.start()
2366 proc.join()
2367
2368 def test_pool_in_process(self):
2369 p = multiprocessing.Process(target=pool_in_process)
2370 p.start()
2371 p.join()
2372
2373 def test_flushing(self):
2374 sio = io.StringIO()
2375 flike = _file_like(sio)
2376 flike.write('foo')
2377 proc = multiprocessing.Process(target=lambda: flike.flush())
2378 flike.flush()
2379 assert sio.getvalue() == 'foo'
2380
Antoine Pitrou709176f2012-04-01 17:19:09 +02002381
2382#
2383# Issue 14151: Test invalid family on invalid environment
2384#
2385
2386class TestInvalidFamily(unittest.TestCase):
2387
2388 @unittest.skipIf(WIN32, "skipped on Windows")
2389 def test_invalid_family(self):
2390 with self.assertRaises(ValueError):
2391 multiprocessing.connection.Listener(r'\\.\test')
2392
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002393 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2394 def test_invalid_family_win32(self):
2395 with self.assertRaises(ValueError):
2396 multiprocessing.connection.Listener('/var/test.pipe')
2397
2398
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002399testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Antoine Pitrou709176f2012-04-01 17:19:09 +02002400 TestStdinBadfiledescriptor, TestInvalidFamily]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002401
Benjamin Petersone711caf2008-06-11 16:44:04 +00002402#
2403#
2404#
2405
2406def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002407 if sys.platform.startswith("linux"):
2408 try:
2409 lock = multiprocessing.RLock()
2410 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002411 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002412
Charles-François Natali3be00952011-11-22 18:36:39 +01002413 check_enough_semaphores()
2414
Benjamin Petersone711caf2008-06-11 16:44:04 +00002415 if run is None:
2416 from test.support import run_unittest as run
2417
2418 util.get_temp_dir() # creates temp directory for use by all processes
2419
2420 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2421
Benjamin Peterson41181742008-07-02 20:22:54 +00002422 ProcessesMixin.pool = multiprocessing.Pool(4)
2423 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2424 ManagerMixin.manager.__init__()
2425 ManagerMixin.manager.start()
2426 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002427
2428 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002429 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2430 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002431 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2432 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002433 )
2434
2435 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2436 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2437 run(suite)
2438
Benjamin Peterson41181742008-07-02 20:22:54 +00002439 ThreadsMixin.pool.terminate()
2440 ProcessesMixin.pool.terminate()
2441 ManagerMixin.pool.terminate()
2442 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002443
Benjamin Peterson41181742008-07-02 20:22:54 +00002444 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002445
2446def main():
2447 test_main(unittest.TextTestRunner(verbosity=2).run)
2448
2449if __name__ == '__main__':
2450 main()