blob: 8edb4208640cb065a8cc366963c9c1835a40baeb [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalie51c8da2011-09-21 18:48:21 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
85
Antoine Pitroubcb39d42011-08-23 19:46:22 +020086try:
87 MAXFD = os.sysconf("SC_OPEN_MAX")
88except:
89 MAXFD = 256
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000092# Some tests require ctypes
93#
94
95try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000096 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000097except ImportError:
98 Structure = object
99 c_int = c_double = None
100
Charles-François Natali3be00952011-11-22 18:36:39 +0100101
102def check_enough_semaphores():
103 """Check that the system supports enough semaphores to run the test."""
104 # minimum number of semaphores available according to POSIX
105 nsems_min = 256
106 try:
107 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
108 except (AttributeError, ValueError):
109 # sysconf not available or setting not available
110 return
111 if nsems == -1 or nsems >= nsems_min:
112 return
113 raise unittest.SkipTest("The OS doesn't support enough semaphores "
114 "to run the test (required: %d)." % nsems_min)
115
116
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000117#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000118# Creates a wrapper for a function which records the time it takes to finish
119#
120
121class TimingWrapper(object):
122
123 def __init__(self, func):
124 self.func = func
125 self.elapsed = None
126
127 def __call__(self, *args, **kwds):
128 t = time.time()
129 try:
130 return self.func(*args, **kwds)
131 finally:
132 self.elapsed = time.time() - t
133
134#
135# Base class for test cases
136#
137
138class BaseTestCase(object):
139
140 ALLOWED_TYPES = ('processes', 'manager', 'threads')
141
142 def assertTimingAlmostEqual(self, a, b):
143 if CHECK_TIMINGS:
144 self.assertAlmostEqual(a, b, 1)
145
146 def assertReturnsIfImplemented(self, value, func, *args):
147 try:
148 res = func(*args)
149 except NotImplementedError:
150 pass
151 else:
152 return self.assertEqual(value, res)
153
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000154 # For the sanity of Windows users, rather than crashing or freezing in
155 # multiple ways.
156 def __reduce__(self, *args):
157 raise NotImplementedError("shouldn't try to pickle a test case")
158
159 __reduce_ex__ = __reduce__
160
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161#
162# Return the value of a semaphore
163#
164
165def get_value(self):
166 try:
167 return self.get_value()
168 except AttributeError:
169 try:
170 return self._Semaphore__value
171 except AttributeError:
172 try:
173 return self._value
174 except AttributeError:
175 raise NotImplementedError
176
177#
178# Testcases
179#
180
181class _TestProcess(BaseTestCase):
182
183 ALLOWED_TYPES = ('processes', 'threads')
184
185 def test_current(self):
186 if self.TYPE == 'threads':
187 return
188
189 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000193 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000194 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 self.assertEqual(current.ident, os.getpid())
197 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000199 @classmethod
200 def _test(cls, q, *args, **kwds):
201 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202 q.put(args)
203 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000205 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 q.put(current.pid)
208
209 def test_process(self):
210 q = self.Queue(1)
211 e = self.Event()
212 args = (q, 1, 2)
213 kwargs = {'hello':23, 'bye':2.54}
214 name = 'SomeProcess'
215 p = self.Process(
216 target=self._test, args=args, kwargs=kwargs, name=name
217 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000218 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219 current = self.current_process()
220
221 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000222 self.assertEqual(p.authkey, current.authkey)
223 self.assertEqual(p.is_alive(), False)
224 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000225 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 p.start()
230
Ezio Melottib3aedd42010-11-20 19:04:17 +0000231 self.assertEqual(p.exitcode, None)
232 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000233 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
Ezio Melottib3aedd42010-11-20 19:04:17 +0000235 self.assertEqual(q.get(), args[1:])
236 self.assertEqual(q.get(), kwargs)
237 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000239 self.assertEqual(q.get(), current.authkey)
240 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.join()
243
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.exitcode, 0)
245 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000248 @classmethod
249 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250 time.sleep(1000)
251
252 def test_terminate(self):
253 if self.TYPE == 'threads':
254 return
255
256 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 p.start()
259
260 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000262 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
264 p.terminate()
265
266 join = TimingWrapper(p.join)
267 self.assertEqual(join(), None)
268 self.assertTimingAlmostEqual(join.elapsed, 0.0)
269
270 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000271 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272
273 p.join()
274
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 # XXX sometimes get p.exitcode == 0 on Windows ...
276 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
278 def test_cpu_count(self):
279 try:
280 cpus = multiprocessing.cpu_count()
281 except NotImplementedError:
282 cpus = 1
283 self.assertTrue(type(cpus) is int)
284 self.assertTrue(cpus >= 1)
285
286 def test_active_children(self):
287 self.assertEqual(type(self.active_children()), list)
288
289 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291
Jesus Cea94f964f2011-09-09 20:26:57 +0200292 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000294 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295
296 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000297 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000299 @classmethod
300 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 from multiprocessing import forking
302 wconn.send(id)
303 if len(id) < 2:
304 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000305 p = cls.Process(
306 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307 )
308 p.start()
309 p.join()
310
311 def test_recursion(self):
312 rconn, wconn = self.Pipe(duplex=False)
313 self._test_recursion(wconn, [])
314
315 time.sleep(DELTA)
316 result = []
317 while rconn.poll():
318 result.append(rconn.recv())
319
320 expected = [
321 [],
322 [0],
323 [0, 0],
324 [0, 1],
325 [1],
326 [1, 0],
327 [1, 1]
328 ]
329 self.assertEqual(result, expected)
330
331#
332#
333#
334
335class _UpperCaser(multiprocessing.Process):
336
337 def __init__(self):
338 multiprocessing.Process.__init__(self)
339 self.child_conn, self.parent_conn = multiprocessing.Pipe()
340
341 def run(self):
342 self.parent_conn.close()
343 for s in iter(self.child_conn.recv, None):
344 self.child_conn.send(s.upper())
345 self.child_conn.close()
346
347 def submit(self, s):
348 assert type(s) is str
349 self.parent_conn.send(s)
350 return self.parent_conn.recv()
351
352 def stop(self):
353 self.parent_conn.send(None)
354 self.parent_conn.close()
355 self.child_conn.close()
356
357class _TestSubclassingProcess(BaseTestCase):
358
359 ALLOWED_TYPES = ('processes',)
360
361 def test_subclassing(self):
362 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200363 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364 uppercaser.start()
365 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
366 self.assertEqual(uppercaser.submit('world'), 'WORLD')
367 uppercaser.stop()
368 uppercaser.join()
369
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100370 def test_stderr_flush(self):
371 # sys.stderr is flushed at process shutdown (issue #13812)
372 if self.TYPE == "threads":
373 return
374
375 testfn = test.support.TESTFN
376 self.addCleanup(test.support.unlink, testfn)
377 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
378 proc.start()
379 proc.join()
380 with open(testfn, 'r') as f:
381 err = f.read()
382 # The whole traceback was printed
383 self.assertIn("ZeroDivisionError", err)
384 self.assertIn("test_multiprocessing.py", err)
385 self.assertIn("1/0 # MARKER", err)
386
387 @classmethod
388 def _test_stderr_flush(cls, testfn):
389 sys.stderr = open(testfn, 'w')
390 1/0 # MARKER
391
392
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393#
394#
395#
396
397def queue_empty(q):
398 if hasattr(q, 'empty'):
399 return q.empty()
400 else:
401 return q.qsize() == 0
402
403def queue_full(q, maxsize):
404 if hasattr(q, 'full'):
405 return q.full()
406 else:
407 return q.qsize() == maxsize
408
409
410class _TestQueue(BaseTestCase):
411
412
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000413 @classmethod
414 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000415 child_can_start.wait()
416 for i in range(6):
417 queue.get()
418 parent_can_continue.set()
419
420 def test_put(self):
421 MAXSIZE = 6
422 queue = self.Queue(maxsize=MAXSIZE)
423 child_can_start = self.Event()
424 parent_can_continue = self.Event()
425
426 proc = self.Process(
427 target=self._test_put,
428 args=(queue, child_can_start, parent_can_continue)
429 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000430 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000431 proc.start()
432
433 self.assertEqual(queue_empty(queue), True)
434 self.assertEqual(queue_full(queue, MAXSIZE), False)
435
436 queue.put(1)
437 queue.put(2, True)
438 queue.put(3, True, None)
439 queue.put(4, False)
440 queue.put(5, False, None)
441 queue.put_nowait(6)
442
443 # the values may be in buffer but not yet in pipe so sleep a bit
444 time.sleep(DELTA)
445
446 self.assertEqual(queue_empty(queue), False)
447 self.assertEqual(queue_full(queue, MAXSIZE), True)
448
449 put = TimingWrapper(queue.put)
450 put_nowait = TimingWrapper(queue.put_nowait)
451
452 self.assertRaises(pyqueue.Full, put, 7, False)
453 self.assertTimingAlmostEqual(put.elapsed, 0)
454
455 self.assertRaises(pyqueue.Full, put, 7, False, None)
456 self.assertTimingAlmostEqual(put.elapsed, 0)
457
458 self.assertRaises(pyqueue.Full, put_nowait, 7)
459 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
460
461 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
462 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
463
464 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
465 self.assertTimingAlmostEqual(put.elapsed, 0)
466
467 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
468 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
469
470 child_can_start.set()
471 parent_can_continue.wait()
472
473 self.assertEqual(queue_empty(queue), True)
474 self.assertEqual(queue_full(queue, MAXSIZE), False)
475
476 proc.join()
477
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000478 @classmethod
479 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000480 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000481 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482 queue.put(2)
483 queue.put(3)
484 queue.put(4)
485 queue.put(5)
486 parent_can_continue.set()
487
488 def test_get(self):
489 queue = self.Queue()
490 child_can_start = self.Event()
491 parent_can_continue = self.Event()
492
493 proc = self.Process(
494 target=self._test_get,
495 args=(queue, child_can_start, parent_can_continue)
496 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000497 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000498 proc.start()
499
500 self.assertEqual(queue_empty(queue), True)
501
502 child_can_start.set()
503 parent_can_continue.wait()
504
505 time.sleep(DELTA)
506 self.assertEqual(queue_empty(queue), False)
507
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000508 # Hangs unexpectedly, remove for now
509 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 self.assertEqual(queue.get(True, None), 2)
511 self.assertEqual(queue.get(True), 3)
512 self.assertEqual(queue.get(timeout=1), 4)
513 self.assertEqual(queue.get_nowait(), 5)
514
515 self.assertEqual(queue_empty(queue), True)
516
517 get = TimingWrapper(queue.get)
518 get_nowait = TimingWrapper(queue.get_nowait)
519
520 self.assertRaises(pyqueue.Empty, get, False)
521 self.assertTimingAlmostEqual(get.elapsed, 0)
522
523 self.assertRaises(pyqueue.Empty, get, False, None)
524 self.assertTimingAlmostEqual(get.elapsed, 0)
525
526 self.assertRaises(pyqueue.Empty, get_nowait)
527 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
528
529 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
530 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
531
532 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
533 self.assertTimingAlmostEqual(get.elapsed, 0)
534
535 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
536 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
537
538 proc.join()
539
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000540 @classmethod
541 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542 for i in range(10, 20):
543 queue.put(i)
544 # note that at this point the items may only be buffered, so the
545 # process cannot shutdown until the feeder thread has finished
546 # pushing items onto the pipe.
547
548 def test_fork(self):
549 # Old versions of Queue would fail to create a new feeder
550 # thread for a forked process if the original process had its
551 # own feeder thread. This test checks that this no longer
552 # happens.
553
554 queue = self.Queue()
555
556 # put items on queue so that main process starts a feeder thread
557 for i in range(10):
558 queue.put(i)
559
560 # wait to make sure thread starts before we fork a new process
561 time.sleep(DELTA)
562
563 # fork process
564 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200565 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000566 p.start()
567
568 # check that all expected items are in the queue
569 for i in range(20):
570 self.assertEqual(queue.get(), i)
571 self.assertRaises(pyqueue.Empty, queue.get, False)
572
573 p.join()
574
575 def test_qsize(self):
576 q = self.Queue()
577 try:
578 self.assertEqual(q.qsize(), 0)
579 except NotImplementedError:
580 return
581 q.put(1)
582 self.assertEqual(q.qsize(), 1)
583 q.put(5)
584 self.assertEqual(q.qsize(), 2)
585 q.get()
586 self.assertEqual(q.qsize(), 1)
587 q.get()
588 self.assertEqual(q.qsize(), 0)
589
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000590 @classmethod
591 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 for obj in iter(q.get, None):
593 time.sleep(DELTA)
594 q.task_done()
595
596 def test_task_done(self):
597 queue = self.JoinableQueue()
598
599 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000600 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000601
602 workers = [self.Process(target=self._test_task_done, args=(queue,))
603 for i in range(4)]
604
605 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200606 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000607 p.start()
608
609 for i in range(10):
610 queue.put(i)
611
612 queue.join()
613
614 for p in workers:
615 queue.put(None)
616
617 for p in workers:
618 p.join()
619
620#
621#
622#
623
624class _TestLock(BaseTestCase):
625
626 def test_lock(self):
627 lock = self.Lock()
628 self.assertEqual(lock.acquire(), True)
629 self.assertEqual(lock.acquire(False), False)
630 self.assertEqual(lock.release(), None)
631 self.assertRaises((ValueError, threading.ThreadError), lock.release)
632
633 def test_rlock(self):
634 lock = self.RLock()
635 self.assertEqual(lock.acquire(), True)
636 self.assertEqual(lock.acquire(), True)
637 self.assertEqual(lock.acquire(), True)
638 self.assertEqual(lock.release(), None)
639 self.assertEqual(lock.release(), None)
640 self.assertEqual(lock.release(), None)
641 self.assertRaises((AssertionError, RuntimeError), lock.release)
642
Jesse Nollerf8d00852009-03-31 03:25:07 +0000643 def test_lock_context(self):
644 with self.Lock():
645 pass
646
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647
648class _TestSemaphore(BaseTestCase):
649
650 def _test_semaphore(self, sem):
651 self.assertReturnsIfImplemented(2, get_value, sem)
652 self.assertEqual(sem.acquire(), True)
653 self.assertReturnsIfImplemented(1, get_value, sem)
654 self.assertEqual(sem.acquire(), True)
655 self.assertReturnsIfImplemented(0, get_value, sem)
656 self.assertEqual(sem.acquire(False), False)
657 self.assertReturnsIfImplemented(0, get_value, sem)
658 self.assertEqual(sem.release(), None)
659 self.assertReturnsIfImplemented(1, get_value, sem)
660 self.assertEqual(sem.release(), None)
661 self.assertReturnsIfImplemented(2, get_value, sem)
662
663 def test_semaphore(self):
664 sem = self.Semaphore(2)
665 self._test_semaphore(sem)
666 self.assertEqual(sem.release(), None)
667 self.assertReturnsIfImplemented(3, get_value, sem)
668 self.assertEqual(sem.release(), None)
669 self.assertReturnsIfImplemented(4, get_value, sem)
670
671 def test_bounded_semaphore(self):
672 sem = self.BoundedSemaphore(2)
673 self._test_semaphore(sem)
674 # Currently fails on OS/X
675 #if HAVE_GETVALUE:
676 # self.assertRaises(ValueError, sem.release)
677 # self.assertReturnsIfImplemented(2, get_value, sem)
678
679 def test_timeout(self):
680 if self.TYPE != 'processes':
681 return
682
683 sem = self.Semaphore(0)
684 acquire = TimingWrapper(sem.acquire)
685
686 self.assertEqual(acquire(False), False)
687 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
688
689 self.assertEqual(acquire(False, None), False)
690 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
691
692 self.assertEqual(acquire(False, TIMEOUT1), False)
693 self.assertTimingAlmostEqual(acquire.elapsed, 0)
694
695 self.assertEqual(acquire(True, TIMEOUT2), False)
696 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
697
698 self.assertEqual(acquire(timeout=TIMEOUT3), False)
699 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
700
701
702class _TestCondition(BaseTestCase):
703
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000704 @classmethod
705 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 cond.acquire()
707 sleeping.release()
708 cond.wait(timeout)
709 woken.release()
710 cond.release()
711
712 def check_invariant(self, cond):
713 # this is only supposed to succeed when there are no sleepers
714 if self.TYPE == 'processes':
715 try:
716 sleepers = (cond._sleeping_count.get_value() -
717 cond._woken_count.get_value())
718 self.assertEqual(sleepers, 0)
719 self.assertEqual(cond._wait_semaphore.get_value(), 0)
720 except NotImplementedError:
721 pass
722
723 def test_notify(self):
724 cond = self.Condition()
725 sleeping = self.Semaphore(0)
726 woken = self.Semaphore(0)
727
728 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000729 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000730 p.start()
731
732 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000733 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000734 p.start()
735
736 # wait for both children to start sleeping
737 sleeping.acquire()
738 sleeping.acquire()
739
740 # check no process/thread has woken up
741 time.sleep(DELTA)
742 self.assertReturnsIfImplemented(0, get_value, woken)
743
744 # wake up one process/thread
745 cond.acquire()
746 cond.notify()
747 cond.release()
748
749 # check one process/thread has woken up
750 time.sleep(DELTA)
751 self.assertReturnsIfImplemented(1, get_value, woken)
752
753 # wake up another
754 cond.acquire()
755 cond.notify()
756 cond.release()
757
758 # check other has woken up
759 time.sleep(DELTA)
760 self.assertReturnsIfImplemented(2, get_value, woken)
761
762 # check state is not mucked up
763 self.check_invariant(cond)
764 p.join()
765
766 def test_notify_all(self):
767 cond = self.Condition()
768 sleeping = self.Semaphore(0)
769 woken = self.Semaphore(0)
770
771 # start some threads/processes which will timeout
772 for i in range(3):
773 p = self.Process(target=self.f,
774 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000775 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 p.start()
777
778 t = threading.Thread(target=self.f,
779 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000780 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000781 t.start()
782
783 # wait for them all to sleep
784 for i in range(6):
785 sleeping.acquire()
786
787 # check they have all timed out
788 for i in range(6):
789 woken.acquire()
790 self.assertReturnsIfImplemented(0, get_value, woken)
791
792 # check state is not mucked up
793 self.check_invariant(cond)
794
795 # start some more threads/processes
796 for i in range(3):
797 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000798 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 p.start()
800
801 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000802 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 t.start()
804
805 # wait for them to all sleep
806 for i in range(6):
807 sleeping.acquire()
808
809 # check no process/thread has woken up
810 time.sleep(DELTA)
811 self.assertReturnsIfImplemented(0, get_value, woken)
812
813 # wake them all up
814 cond.acquire()
815 cond.notify_all()
816 cond.release()
817
818 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200819 for i in range(10):
820 try:
821 if get_value(woken) == 6:
822 break
823 except NotImplementedError:
824 break
825 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 self.assertReturnsIfImplemented(6, get_value, woken)
827
828 # check state is not mucked up
829 self.check_invariant(cond)
830
831 def test_timeout(self):
832 cond = self.Condition()
833 wait = TimingWrapper(cond.wait)
834 cond.acquire()
835 res = wait(TIMEOUT1)
836 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000837 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
839
840
841class _TestEvent(BaseTestCase):
842
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000843 @classmethod
844 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845 time.sleep(TIMEOUT2)
846 event.set()
847
848 def test_event(self):
849 event = self.Event()
850 wait = TimingWrapper(event.wait)
851
Ezio Melotti13925002011-03-16 11:05:33 +0200852 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000854 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855
Benjamin Peterson965ce872009-04-05 21:24:58 +0000856 # Removed, threading.Event.wait() will return the value of the __flag
857 # instead of None. API Shear with the semaphore backed mp.Event
858 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000860 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
862
863 event.set()
864
865 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000866 self.assertEqual(event.is_set(), True)
867 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000869 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
871 # self.assertEqual(event.is_set(), True)
872
873 event.clear()
874
875 #self.assertEqual(event.is_set(), False)
876
Jesus Cea94f964f2011-09-09 20:26:57 +0200877 p = self.Process(target=self._test_event, args=(event,))
878 p.daemon = True
879 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000880 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881
882#
883#
884#
885
886class _TestValue(BaseTestCase):
887
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000888 ALLOWED_TYPES = ('processes',)
889
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 codes_values = [
891 ('i', 4343, 24234),
892 ('d', 3.625, -4.25),
893 ('h', -232, 234),
894 ('c', latin('x'), latin('y'))
895 ]
896
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000897 def setUp(self):
898 if not HAS_SHAREDCTYPES:
899 self.skipTest("requires multiprocessing.sharedctypes")
900
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000901 @classmethod
902 def _test(cls, values):
903 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 sv.value = cv[2]
905
906
907 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 if raw:
909 values = [self.RawValue(code, value)
910 for code, value, _ in self.codes_values]
911 else:
912 values = [self.Value(code, value)
913 for code, value, _ in self.codes_values]
914
915 for sv, cv in zip(values, self.codes_values):
916 self.assertEqual(sv.value, cv[1])
917
918 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200919 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 proc.start()
921 proc.join()
922
923 for sv, cv in zip(values, self.codes_values):
924 self.assertEqual(sv.value, cv[2])
925
926 def test_rawvalue(self):
927 self.test_value(raw=True)
928
929 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930 val1 = self.Value('i', 5)
931 lock1 = val1.get_lock()
932 obj1 = val1.get_obj()
933
934 val2 = self.Value('i', 5, lock=None)
935 lock2 = val2.get_lock()
936 obj2 = val2.get_obj()
937
938 lock = self.Lock()
939 val3 = self.Value('i', 5, lock=lock)
940 lock3 = val3.get_lock()
941 obj3 = val3.get_obj()
942 self.assertEqual(lock, lock3)
943
Jesse Nollerb0516a62009-01-18 03:11:38 +0000944 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000945 self.assertFalse(hasattr(arr4, 'get_lock'))
946 self.assertFalse(hasattr(arr4, 'get_obj'))
947
Jesse Nollerb0516a62009-01-18 03:11:38 +0000948 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
949
950 arr5 = self.RawValue('i', 5)
951 self.assertFalse(hasattr(arr5, 'get_lock'))
952 self.assertFalse(hasattr(arr5, 'get_obj'))
953
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954
955class _TestArray(BaseTestCase):
956
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000957 ALLOWED_TYPES = ('processes',)
958
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000959 @classmethod
960 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961 for i in range(1, len(seq)):
962 seq[i] += seq[i-1]
963
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000964 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000965 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000966 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
967 if raw:
968 arr = self.RawArray('i', seq)
969 else:
970 arr = self.Array('i', seq)
971
972 self.assertEqual(len(arr), len(seq))
973 self.assertEqual(arr[3], seq[3])
974 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
975
976 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
977
978 self.assertEqual(list(arr[:]), seq)
979
980 self.f(seq)
981
982 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200983 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000984 p.start()
985 p.join()
986
987 self.assertEqual(list(arr[:]), seq)
988
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000989 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000990 def test_array_from_size(self):
991 size = 10
992 # Test for zeroing (see issue #11675).
993 # The repetition below strengthens the test by increasing the chances
994 # of previously allocated non-zero memory being used for the new array
995 # on the 2nd and 3rd loops.
996 for _ in range(3):
997 arr = self.Array('i', size)
998 self.assertEqual(len(arr), size)
999 self.assertEqual(list(arr), [0] * size)
1000 arr[:] = range(10)
1001 self.assertEqual(list(arr), list(range(10)))
1002 del arr
1003
1004 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001005 def test_rawarray(self):
1006 self.test_array(raw=True)
1007
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001008 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001009 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010 arr1 = self.Array('i', list(range(10)))
1011 lock1 = arr1.get_lock()
1012 obj1 = arr1.get_obj()
1013
1014 arr2 = self.Array('i', list(range(10)), lock=None)
1015 lock2 = arr2.get_lock()
1016 obj2 = arr2.get_obj()
1017
1018 lock = self.Lock()
1019 arr3 = self.Array('i', list(range(10)), lock=lock)
1020 lock3 = arr3.get_lock()
1021 obj3 = arr3.get_obj()
1022 self.assertEqual(lock, lock3)
1023
Jesse Nollerb0516a62009-01-18 03:11:38 +00001024 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025 self.assertFalse(hasattr(arr4, 'get_lock'))
1026 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001027 self.assertRaises(AttributeError,
1028 self.Array, 'i', range(10), lock='notalock')
1029
1030 arr5 = self.RawArray('i', range(10))
1031 self.assertFalse(hasattr(arr5, 'get_lock'))
1032 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033
1034#
1035#
1036#
1037
1038class _TestContainers(BaseTestCase):
1039
1040 ALLOWED_TYPES = ('manager',)
1041
1042 def test_list(self):
1043 a = self.list(list(range(10)))
1044 self.assertEqual(a[:], list(range(10)))
1045
1046 b = self.list()
1047 self.assertEqual(b[:], [])
1048
1049 b.extend(list(range(5)))
1050 self.assertEqual(b[:], list(range(5)))
1051
1052 self.assertEqual(b[2], 2)
1053 self.assertEqual(b[2:10], [2,3,4])
1054
1055 b *= 2
1056 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1057
1058 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1059
1060 self.assertEqual(a[:], list(range(10)))
1061
1062 d = [a, b]
1063 e = self.list(d)
1064 self.assertEqual(
1065 e[:],
1066 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1067 )
1068
1069 f = self.list([a])
1070 a.append('hello')
1071 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1072
1073 def test_dict(self):
1074 d = self.dict()
1075 indices = list(range(65, 70))
1076 for i in indices:
1077 d[i] = chr(i)
1078 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1079 self.assertEqual(sorted(d.keys()), indices)
1080 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1081 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1082
1083 def test_namespace(self):
1084 n = self.Namespace()
1085 n.name = 'Bob'
1086 n.job = 'Builder'
1087 n._hidden = 'hidden'
1088 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1089 del n.job
1090 self.assertEqual(str(n), "Namespace(name='Bob')")
1091 self.assertTrue(hasattr(n, 'name'))
1092 self.assertTrue(not hasattr(n, 'job'))
1093
1094#
1095#
1096#
1097
1098def sqr(x, wait=0.0):
1099 time.sleep(wait)
1100 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001101
Benjamin Petersone711caf2008-06-11 16:44:04 +00001102class _TestPool(BaseTestCase):
1103
1104 def test_apply(self):
1105 papply = self.pool.apply
1106 self.assertEqual(papply(sqr, (5,)), sqr(5))
1107 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1108
1109 def test_map(self):
1110 pmap = self.pool.map
1111 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1112 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1113 list(map(sqr, list(range(100)))))
1114
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001115 def test_map_chunksize(self):
1116 try:
1117 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1118 except multiprocessing.TimeoutError:
1119 self.fail("pool.map_async with chunksize stalled on null list")
1120
Benjamin Petersone711caf2008-06-11 16:44:04 +00001121 def test_async(self):
1122 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1123 get = TimingWrapper(res.get)
1124 self.assertEqual(get(), 49)
1125 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1126
1127 def test_async_timeout(self):
1128 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1129 get = TimingWrapper(res.get)
1130 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1131 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1132
1133 def test_imap(self):
1134 it = self.pool.imap(sqr, list(range(10)))
1135 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1136
1137 it = self.pool.imap(sqr, list(range(10)))
1138 for i in range(10):
1139 self.assertEqual(next(it), i*i)
1140 self.assertRaises(StopIteration, it.__next__)
1141
1142 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1143 for i in range(1000):
1144 self.assertEqual(next(it), i*i)
1145 self.assertRaises(StopIteration, it.__next__)
1146
1147 def test_imap_unordered(self):
1148 it = self.pool.imap_unordered(sqr, list(range(1000)))
1149 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1150
1151 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1152 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1153
1154 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001155 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1156 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1157
Benjamin Petersone711caf2008-06-11 16:44:04 +00001158 p = multiprocessing.Pool(3)
1159 self.assertEqual(3, len(p._pool))
1160 p.close()
1161 p.join()
1162
1163 def test_terminate(self):
1164 if self.TYPE == 'manager':
1165 # On Unix a forked process increfs each shared object to
1166 # which its parent process held a reference. If the
1167 # forked process gets terminated then there is likely to
1168 # be a reference leak. So to prevent
1169 # _TestZZZNumberOfObjects from failing we skip this test
1170 # when using a manager.
1171 return
1172
1173 result = self.pool.map_async(
1174 time.sleep, [0.1 for i in range(10000)], chunksize=1
1175 )
1176 self.pool.terminate()
1177 join = TimingWrapper(self.pool.join)
1178 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001179 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001180
Ask Solem2afcbf22010-11-09 20:55:52 +00001181def raising():
1182 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001183
Ask Solem2afcbf22010-11-09 20:55:52 +00001184def unpickleable_result():
1185 return lambda: 42
1186
1187class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001188 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001189
1190 def test_async_error_callback(self):
1191 p = multiprocessing.Pool(2)
1192
1193 scratchpad = [None]
1194 def errback(exc):
1195 scratchpad[0] = exc
1196
1197 res = p.apply_async(raising, error_callback=errback)
1198 self.assertRaises(KeyError, res.get)
1199 self.assertTrue(scratchpad[0])
1200 self.assertIsInstance(scratchpad[0], KeyError)
1201
1202 p.close()
1203 p.join()
1204
1205 def test_unpickleable_result(self):
1206 from multiprocessing.pool import MaybeEncodingError
1207 p = multiprocessing.Pool(2)
1208
1209 # Make sure we don't lose pool processes because of encoding errors.
1210 for iteration in range(20):
1211
1212 scratchpad = [None]
1213 def errback(exc):
1214 scratchpad[0] = exc
1215
1216 res = p.apply_async(unpickleable_result, error_callback=errback)
1217 self.assertRaises(MaybeEncodingError, res.get)
1218 wrapped = scratchpad[0]
1219 self.assertTrue(wrapped)
1220 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1221 self.assertIsNotNone(wrapped.exc)
1222 self.assertIsNotNone(wrapped.value)
1223
1224 p.close()
1225 p.join()
1226
1227class _TestPoolWorkerLifetime(BaseTestCase):
1228 ALLOWED_TYPES = ('processes', )
1229
Jesse Noller1f0b6582010-01-27 03:36:01 +00001230 def test_pool_worker_lifetime(self):
1231 p = multiprocessing.Pool(3, maxtasksperchild=10)
1232 self.assertEqual(3, len(p._pool))
1233 origworkerpids = [w.pid for w in p._pool]
1234 # Run many tasks so each worker gets replaced (hopefully)
1235 results = []
1236 for i in range(100):
1237 results.append(p.apply_async(sqr, (i, )))
1238 # Fetch the results and verify we got the right answers,
1239 # also ensuring all the tasks have completed.
1240 for (j, res) in enumerate(results):
1241 self.assertEqual(res.get(), sqr(j))
1242 # Refill the pool
1243 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001244 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001245 # (countdown * DELTA = 5 seconds max startup process time)
1246 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001247 while countdown and not all(w.is_alive() for w in p._pool):
1248 countdown -= 1
1249 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001250 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001251 # All pids should be assigned. See issue #7805.
1252 self.assertNotIn(None, origworkerpids)
1253 self.assertNotIn(None, finalworkerpids)
1254 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001255 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1256 p.close()
1257 p.join()
1258
Charles-François Natalif8859e12011-10-24 18:45:29 +02001259 def test_pool_worker_lifetime_early_close(self):
1260 # Issue #10332: closing a pool whose workers have limited lifetimes
1261 # before all the tasks completed would make join() hang.
1262 p = multiprocessing.Pool(3, maxtasksperchild=1)
1263 results = []
1264 for i in range(6):
1265 results.append(p.apply_async(sqr, (i, 0.3)))
1266 p.close()
1267 p.join()
1268 # check the results
1269 for (j, res) in enumerate(results):
1270 self.assertEqual(res.get(), sqr(j))
1271
1272
Benjamin Petersone711caf2008-06-11 16:44:04 +00001273#
1274# Test that manager has expected number of shared objects left
1275#
1276
1277class _TestZZZNumberOfObjects(BaseTestCase):
1278 # Because test cases are sorted alphabetically, this one will get
1279 # run after all the other tests for the manager. It tests that
1280 # there have been no "reference leaks" for the manager's shared
1281 # objects. Note the comment in _TestPool.test_terminate().
1282 ALLOWED_TYPES = ('manager',)
1283
1284 def test_number_of_objects(self):
1285 EXPECTED_NUMBER = 1 # the pool object is still alive
1286 multiprocessing.active_children() # discard dead process objs
1287 gc.collect() # do garbage collection
1288 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001289 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001290 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001291 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001292 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001293
1294 self.assertEqual(refs, EXPECTED_NUMBER)
1295
1296#
1297# Test of creating a customized manager class
1298#
1299
1300from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1301
1302class FooBar(object):
1303 def f(self):
1304 return 'f()'
1305 def g(self):
1306 raise ValueError
1307 def _h(self):
1308 return '_h()'
1309
1310def baz():
1311 for i in range(10):
1312 yield i*i
1313
1314class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001315 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001316 def __iter__(self):
1317 return self
1318 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001319 return self._callmethod('__next__')
1320
1321class MyManager(BaseManager):
1322 pass
1323
1324MyManager.register('Foo', callable=FooBar)
1325MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1326MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1327
1328
1329class _TestMyManager(BaseTestCase):
1330
1331 ALLOWED_TYPES = ('manager',)
1332
1333 def test_mymanager(self):
1334 manager = MyManager()
1335 manager.start()
1336
1337 foo = manager.Foo()
1338 bar = manager.Bar()
1339 baz = manager.baz()
1340
1341 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1342 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1343
1344 self.assertEqual(foo_methods, ['f', 'g'])
1345 self.assertEqual(bar_methods, ['f', '_h'])
1346
1347 self.assertEqual(foo.f(), 'f()')
1348 self.assertRaises(ValueError, foo.g)
1349 self.assertEqual(foo._callmethod('f'), 'f()')
1350 self.assertRaises(RemoteError, foo._callmethod, '_h')
1351
1352 self.assertEqual(bar.f(), 'f()')
1353 self.assertEqual(bar._h(), '_h()')
1354 self.assertEqual(bar._callmethod('f'), 'f()')
1355 self.assertEqual(bar._callmethod('_h'), '_h()')
1356
1357 self.assertEqual(list(baz), [i*i for i in range(10)])
1358
1359 manager.shutdown()
1360
1361#
1362# Test of connecting to a remote server and using xmlrpclib for serialization
1363#
1364
1365_queue = pyqueue.Queue()
1366def get_queue():
1367 return _queue
1368
1369class QueueManager(BaseManager):
1370 '''manager class used by server process'''
1371QueueManager.register('get_queue', callable=get_queue)
1372
1373class QueueManager2(BaseManager):
1374 '''manager class which specifies the same interface as QueueManager'''
1375QueueManager2.register('get_queue')
1376
1377
1378SERIALIZER = 'xmlrpclib'
1379
1380class _TestRemoteManager(BaseTestCase):
1381
1382 ALLOWED_TYPES = ('manager',)
1383
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001384 @classmethod
1385 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001386 manager = QueueManager2(
1387 address=address, authkey=authkey, serializer=SERIALIZER
1388 )
1389 manager.connect()
1390 queue = manager.get_queue()
1391 queue.put(('hello world', None, True, 2.25))
1392
1393 def test_remote(self):
1394 authkey = os.urandom(32)
1395
1396 manager = QueueManager(
1397 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1398 )
1399 manager.start()
1400
1401 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001402 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001403 p.start()
1404
1405 manager2 = QueueManager2(
1406 address=manager.address, authkey=authkey, serializer=SERIALIZER
1407 )
1408 manager2.connect()
1409 queue = manager2.get_queue()
1410
1411 # Note that xmlrpclib will deserialize object as a list not a tuple
1412 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1413
1414 # Because we are using xmlrpclib for serialization instead of
1415 # pickle this will cause a serialization error.
1416 self.assertRaises(Exception, queue.put, time.sleep)
1417
1418 # Make queue finalizer run before the server is stopped
1419 del queue
1420 manager.shutdown()
1421
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001422class _TestManagerRestart(BaseTestCase):
1423
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001424 @classmethod
1425 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001426 manager = QueueManager(
1427 address=address, authkey=authkey, serializer=SERIALIZER)
1428 manager.connect()
1429 queue = manager.get_queue()
1430 queue.put('hello world')
1431
1432 def test_rapid_restart(self):
1433 authkey = os.urandom(32)
1434 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001435 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001436 srvr = manager.get_server()
1437 addr = srvr.address
1438 # Close the connection.Listener socket which gets opened as a part
1439 # of manager.get_server(). It's not needed for the test.
1440 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001441 manager.start()
1442
1443 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001444 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001445 p.start()
1446 queue = manager.get_queue()
1447 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001448 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001449 manager.shutdown()
1450 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001451 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001452 try:
1453 manager.start()
1454 except IOError as e:
1455 if e.errno != errno.EADDRINUSE:
1456 raise
1457 # Retry after some time, in case the old socket was lingering
1458 # (sporadic failure on buildbots)
1459 time.sleep(1.0)
1460 manager = QueueManager(
1461 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001462 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001463
Benjamin Petersone711caf2008-06-11 16:44:04 +00001464#
1465#
1466#
1467
1468SENTINEL = latin('')
1469
1470class _TestConnection(BaseTestCase):
1471
1472 ALLOWED_TYPES = ('processes', 'threads')
1473
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001474 @classmethod
1475 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001476 for msg in iter(conn.recv_bytes, SENTINEL):
1477 conn.send_bytes(msg)
1478 conn.close()
1479
1480 def test_connection(self):
1481 conn, child_conn = self.Pipe()
1482
1483 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001484 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001485 p.start()
1486
1487 seq = [1, 2.25, None]
1488 msg = latin('hello world')
1489 longmsg = msg * 10
1490 arr = array.array('i', list(range(4)))
1491
1492 if self.TYPE == 'processes':
1493 self.assertEqual(type(conn.fileno()), int)
1494
1495 self.assertEqual(conn.send(seq), None)
1496 self.assertEqual(conn.recv(), seq)
1497
1498 self.assertEqual(conn.send_bytes(msg), None)
1499 self.assertEqual(conn.recv_bytes(), msg)
1500
1501 if self.TYPE == 'processes':
1502 buffer = array.array('i', [0]*10)
1503 expected = list(arr) + [0] * (10 - len(arr))
1504 self.assertEqual(conn.send_bytes(arr), None)
1505 self.assertEqual(conn.recv_bytes_into(buffer),
1506 len(arr) * buffer.itemsize)
1507 self.assertEqual(list(buffer), expected)
1508
1509 buffer = array.array('i', [0]*10)
1510 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1511 self.assertEqual(conn.send_bytes(arr), None)
1512 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1513 len(arr) * buffer.itemsize)
1514 self.assertEqual(list(buffer), expected)
1515
1516 buffer = bytearray(latin(' ' * 40))
1517 self.assertEqual(conn.send_bytes(longmsg), None)
1518 try:
1519 res = conn.recv_bytes_into(buffer)
1520 except multiprocessing.BufferTooShort as e:
1521 self.assertEqual(e.args, (longmsg,))
1522 else:
1523 self.fail('expected BufferTooShort, got %s' % res)
1524
1525 poll = TimingWrapper(conn.poll)
1526
1527 self.assertEqual(poll(), False)
1528 self.assertTimingAlmostEqual(poll.elapsed, 0)
1529
1530 self.assertEqual(poll(TIMEOUT1), False)
1531 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1532
1533 conn.send(None)
1534
1535 self.assertEqual(poll(TIMEOUT1), True)
1536 self.assertTimingAlmostEqual(poll.elapsed, 0)
1537
1538 self.assertEqual(conn.recv(), None)
1539
1540 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1541 conn.send_bytes(really_big_msg)
1542 self.assertEqual(conn.recv_bytes(), really_big_msg)
1543
1544 conn.send_bytes(SENTINEL) # tell child to quit
1545 child_conn.close()
1546
1547 if self.TYPE == 'processes':
1548 self.assertEqual(conn.readable, True)
1549 self.assertEqual(conn.writable, True)
1550 self.assertRaises(EOFError, conn.recv)
1551 self.assertRaises(EOFError, conn.recv_bytes)
1552
1553 p.join()
1554
1555 def test_duplex_false(self):
1556 reader, writer = self.Pipe(duplex=False)
1557 self.assertEqual(writer.send(1), None)
1558 self.assertEqual(reader.recv(), 1)
1559 if self.TYPE == 'processes':
1560 self.assertEqual(reader.readable, True)
1561 self.assertEqual(reader.writable, False)
1562 self.assertEqual(writer.readable, False)
1563 self.assertEqual(writer.writable, True)
1564 self.assertRaises(IOError, reader.send, 2)
1565 self.assertRaises(IOError, writer.recv)
1566 self.assertRaises(IOError, writer.poll)
1567
1568 def test_spawn_close(self):
1569 # We test that a pipe connection can be closed by parent
1570 # process immediately after child is spawned. On Windows this
1571 # would have sometimes failed on old versions because
1572 # child_conn would be closed before the child got a chance to
1573 # duplicate it.
1574 conn, child_conn = self.Pipe()
1575
1576 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001577 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001578 p.start()
1579 child_conn.close() # this might complete before child initializes
1580
1581 msg = latin('hello')
1582 conn.send_bytes(msg)
1583 self.assertEqual(conn.recv_bytes(), msg)
1584
1585 conn.send_bytes(SENTINEL)
1586 conn.close()
1587 p.join()
1588
1589 def test_sendbytes(self):
1590 if self.TYPE != 'processes':
1591 return
1592
1593 msg = latin('abcdefghijklmnopqrstuvwxyz')
1594 a, b = self.Pipe()
1595
1596 a.send_bytes(msg)
1597 self.assertEqual(b.recv_bytes(), msg)
1598
1599 a.send_bytes(msg, 5)
1600 self.assertEqual(b.recv_bytes(), msg[5:])
1601
1602 a.send_bytes(msg, 7, 8)
1603 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1604
1605 a.send_bytes(msg, 26)
1606 self.assertEqual(b.recv_bytes(), latin(''))
1607
1608 a.send_bytes(msg, 26, 0)
1609 self.assertEqual(b.recv_bytes(), latin(''))
1610
1611 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1612
1613 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1614
1615 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1616
1617 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1618
1619 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1620
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001621 @classmethod
1622 def _is_fd_assigned(cls, fd):
1623 try:
1624 os.fstat(fd)
1625 except OSError as e:
1626 if e.errno == errno.EBADF:
1627 return False
1628 raise
1629 else:
1630 return True
1631
1632 @classmethod
1633 def _writefd(cls, conn, data, create_dummy_fds=False):
1634 if create_dummy_fds:
1635 for i in range(0, 256):
1636 if not cls._is_fd_assigned(i):
1637 os.dup2(conn.fileno(), i)
1638 fd = reduction.recv_handle(conn)
1639 if msvcrt:
1640 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1641 os.write(fd, data)
1642 os.close(fd)
1643
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001644 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001645 def test_fd_transfer(self):
1646 if self.TYPE != 'processes':
1647 self.skipTest("only makes sense with processes")
1648 conn, child_conn = self.Pipe(duplex=True)
1649
1650 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001651 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001652 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001653 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001654 with open(test.support.TESTFN, "wb") as f:
1655 fd = f.fileno()
1656 if msvcrt:
1657 fd = msvcrt.get_osfhandle(fd)
1658 reduction.send_handle(conn, fd, p.pid)
1659 p.join()
1660 with open(test.support.TESTFN, "rb") as f:
1661 self.assertEqual(f.read(), b"foo")
1662
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001663 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001664 @unittest.skipIf(sys.platform == "win32",
1665 "test semantics don't make sense on Windows")
1666 @unittest.skipIf(MAXFD <= 256,
1667 "largest assignable fd number is too small")
1668 @unittest.skipUnless(hasattr(os, "dup2"),
1669 "test needs os.dup2()")
1670 def test_large_fd_transfer(self):
1671 # With fd > 256 (issue #11657)
1672 if self.TYPE != 'processes':
1673 self.skipTest("only makes sense with processes")
1674 conn, child_conn = self.Pipe(duplex=True)
1675
1676 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001677 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001678 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001679 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001680 with open(test.support.TESTFN, "wb") as f:
1681 fd = f.fileno()
1682 for newfd in range(256, MAXFD):
1683 if not self._is_fd_assigned(newfd):
1684 break
1685 else:
1686 self.fail("could not find an unassigned large file descriptor")
1687 os.dup2(fd, newfd)
1688 try:
1689 reduction.send_handle(conn, newfd, p.pid)
1690 finally:
1691 os.close(newfd)
1692 p.join()
1693 with open(test.support.TESTFN, "rb") as f:
1694 self.assertEqual(f.read(), b"bar")
1695
Jesus Cea4507e642011-09-21 03:53:25 +02001696 @classmethod
1697 def _send_data_without_fd(self, conn):
1698 os.write(conn.fileno(), b"\0")
1699
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001700 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001701 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1702 def test_missing_fd_transfer(self):
1703 # Check that exception is raised when received data is not
1704 # accompanied by a file descriptor in ancillary data.
1705 if self.TYPE != 'processes':
1706 self.skipTest("only makes sense with processes")
1707 conn, child_conn = self.Pipe(duplex=True)
1708
1709 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1710 p.daemon = True
1711 p.start()
1712 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1713 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001714
Benjamin Petersone711caf2008-06-11 16:44:04 +00001715class _TestListenerClient(BaseTestCase):
1716
1717 ALLOWED_TYPES = ('processes', 'threads')
1718
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001719 @classmethod
1720 def _test(cls, address):
1721 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001722 conn.send('hello')
1723 conn.close()
1724
1725 def test_listener_client(self):
1726 for family in self.connection.families:
1727 l = self.connection.Listener(family=family)
1728 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001729 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001730 p.start()
1731 conn = l.accept()
1732 self.assertEqual(conn.recv(), 'hello')
1733 p.join()
1734 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001735#
1736# Test of sending connection and socket objects between processes
1737#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001738"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001739class _TestPicklingConnections(BaseTestCase):
1740
1741 ALLOWED_TYPES = ('processes',)
1742
1743 def _listener(self, conn, families):
1744 for fam in families:
1745 l = self.connection.Listener(family=fam)
1746 conn.send(l.address)
1747 new_conn = l.accept()
1748 conn.send(new_conn)
1749
1750 if self.TYPE == 'processes':
1751 l = socket.socket()
1752 l.bind(('localhost', 0))
1753 conn.send(l.getsockname())
1754 l.listen(1)
1755 new_conn, addr = l.accept()
1756 conn.send(new_conn)
1757
1758 conn.recv()
1759
1760 def _remote(self, conn):
1761 for (address, msg) in iter(conn.recv, None):
1762 client = self.connection.Client(address)
1763 client.send(msg.upper())
1764 client.close()
1765
1766 if self.TYPE == 'processes':
1767 address, msg = conn.recv()
1768 client = socket.socket()
1769 client.connect(address)
1770 client.sendall(msg.upper())
1771 client.close()
1772
1773 conn.close()
1774
1775 def test_pickling(self):
1776 try:
1777 multiprocessing.allow_connection_pickling()
1778 except ImportError:
1779 return
1780
1781 families = self.connection.families
1782
1783 lconn, lconn0 = self.Pipe()
1784 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001785 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001786 lp.start()
1787 lconn0.close()
1788
1789 rconn, rconn0 = self.Pipe()
1790 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001791 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001792 rp.start()
1793 rconn0.close()
1794
1795 for fam in families:
1796 msg = ('This connection uses family %s' % fam).encode('ascii')
1797 address = lconn.recv()
1798 rconn.send((address, msg))
1799 new_conn = lconn.recv()
1800 self.assertEqual(new_conn.recv(), msg.upper())
1801
1802 rconn.send(None)
1803
1804 if self.TYPE == 'processes':
1805 msg = latin('This connection uses a normal socket')
1806 address = lconn.recv()
1807 rconn.send((address, msg))
1808 if hasattr(socket, 'fromfd'):
1809 new_conn = lconn.recv()
1810 self.assertEqual(new_conn.recv(100), msg.upper())
1811 else:
1812 # XXX On Windows with Py2.6 need to backport fromfd()
1813 discard = lconn.recv_bytes()
1814
1815 lconn.send(None)
1816
1817 rconn.close()
1818 lconn.close()
1819
1820 lp.join()
1821 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001822"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001823#
1824#
1825#
1826
1827class _TestHeap(BaseTestCase):
1828
1829 ALLOWED_TYPES = ('processes',)
1830
1831 def test_heap(self):
1832 iterations = 5000
1833 maxblocks = 50
1834 blocks = []
1835
1836 # create and destroy lots of blocks of different sizes
1837 for i in range(iterations):
1838 size = int(random.lognormvariate(0, 1) * 1000)
1839 b = multiprocessing.heap.BufferWrapper(size)
1840 blocks.append(b)
1841 if len(blocks) > maxblocks:
1842 i = random.randrange(maxblocks)
1843 del blocks[i]
1844
1845 # get the heap object
1846 heap = multiprocessing.heap.BufferWrapper._heap
1847
1848 # verify the state of the heap
1849 all = []
1850 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001851 heap._lock.acquire()
1852 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001853 for L in list(heap._len_to_seq.values()):
1854 for arena, start, stop in L:
1855 all.append((heap._arenas.index(arena), start, stop,
1856 stop-start, 'free'))
1857 for arena, start, stop in heap._allocated_blocks:
1858 all.append((heap._arenas.index(arena), start, stop,
1859 stop-start, 'occupied'))
1860 occupied += (stop-start)
1861
1862 all.sort()
1863
1864 for i in range(len(all)-1):
1865 (arena, start, stop) = all[i][:3]
1866 (narena, nstart, nstop) = all[i+1][:3]
1867 self.assertTrue((arena != narena and nstart == 0) or
1868 (stop == nstart))
1869
Charles-François Natali778db492011-07-02 14:35:49 +02001870 def test_free_from_gc(self):
1871 # Check that freeing of blocks by the garbage collector doesn't deadlock
1872 # (issue #12352).
1873 # Make sure the GC is enabled, and set lower collection thresholds to
1874 # make collections more frequent (and increase the probability of
1875 # deadlock).
1876 if not gc.isenabled():
1877 gc.enable()
1878 self.addCleanup(gc.disable)
1879 thresholds = gc.get_threshold()
1880 self.addCleanup(gc.set_threshold, *thresholds)
1881 gc.set_threshold(10)
1882
1883 # perform numerous block allocations, with cyclic references to make
1884 # sure objects are collected asynchronously by the gc
1885 for i in range(5000):
1886 a = multiprocessing.heap.BufferWrapper(1)
1887 b = multiprocessing.heap.BufferWrapper(1)
1888 # circular references
1889 a.buddy = b
1890 b.buddy = a
1891
Benjamin Petersone711caf2008-06-11 16:44:04 +00001892#
1893#
1894#
1895
Benjamin Petersone711caf2008-06-11 16:44:04 +00001896class _Foo(Structure):
1897 _fields_ = [
1898 ('x', c_int),
1899 ('y', c_double)
1900 ]
1901
1902class _TestSharedCTypes(BaseTestCase):
1903
1904 ALLOWED_TYPES = ('processes',)
1905
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001906 def setUp(self):
1907 if not HAS_SHAREDCTYPES:
1908 self.skipTest("requires multiprocessing.sharedctypes")
1909
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001910 @classmethod
1911 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001912 x.value *= 2
1913 y.value *= 2
1914 foo.x *= 2
1915 foo.y *= 2
1916 string.value *= 2
1917 for i in range(len(arr)):
1918 arr[i] *= 2
1919
1920 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001921 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001922 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001923 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001924 arr = self.Array('d', list(range(10)), lock=lock)
1925 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001926 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001927
1928 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001929 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001930 p.start()
1931 p.join()
1932
1933 self.assertEqual(x.value, 14)
1934 self.assertAlmostEqual(y.value, 2.0/3.0)
1935 self.assertEqual(foo.x, 6)
1936 self.assertAlmostEqual(foo.y, 4.0)
1937 for i in range(10):
1938 self.assertAlmostEqual(arr[i], i*2)
1939 self.assertEqual(string.value, latin('hellohello'))
1940
1941 def test_synchronize(self):
1942 self.test_sharedctypes(lock=True)
1943
1944 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001945 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001946 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001947 foo.x = 0
1948 foo.y = 0
1949 self.assertEqual(bar.x, 2)
1950 self.assertAlmostEqual(bar.y, 5.0)
1951
1952#
1953#
1954#
1955
1956class _TestFinalize(BaseTestCase):
1957
1958 ALLOWED_TYPES = ('processes',)
1959
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001960 @classmethod
1961 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001962 class Foo(object):
1963 pass
1964
1965 a = Foo()
1966 util.Finalize(a, conn.send, args=('a',))
1967 del a # triggers callback for a
1968
1969 b = Foo()
1970 close_b = util.Finalize(b, conn.send, args=('b',))
1971 close_b() # triggers callback for b
1972 close_b() # does nothing because callback has already been called
1973 del b # does nothing because callback has already been called
1974
1975 c = Foo()
1976 util.Finalize(c, conn.send, args=('c',))
1977
1978 d10 = Foo()
1979 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1980
1981 d01 = Foo()
1982 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1983 d02 = Foo()
1984 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1985 d03 = Foo()
1986 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1987
1988 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1989
1990 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1991
Ezio Melotti13925002011-03-16 11:05:33 +02001992 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001993 # garbage collecting locals
1994 util._exit_function()
1995 conn.close()
1996 os._exit(0)
1997
1998 def test_finalize(self):
1999 conn, child_conn = self.Pipe()
2000
2001 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002002 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002003 p.start()
2004 p.join()
2005
2006 result = [obj for obj in iter(conn.recv, 'STOP')]
2007 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2008
2009#
2010# Test that from ... import * works for each module
2011#
2012
2013class _TestImportStar(BaseTestCase):
2014
2015 ALLOWED_TYPES = ('processes',)
2016
2017 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002018 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002019 'multiprocessing', 'multiprocessing.connection',
2020 'multiprocessing.heap', 'multiprocessing.managers',
2021 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002022 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002023 ]
2024
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002025 if HAS_REDUCTION:
2026 modules.append('multiprocessing.reduction')
2027
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002028 if c_int is not None:
2029 # This module requires _ctypes
2030 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002031
2032 for name in modules:
2033 __import__(name)
2034 mod = sys.modules[name]
2035
2036 for attr in getattr(mod, '__all__', ()):
2037 self.assertTrue(
2038 hasattr(mod, attr),
2039 '%r does not have attribute %r' % (mod, attr)
2040 )
2041
2042#
2043# Quick test that logging works -- does not test logging output
2044#
2045
2046class _TestLogging(BaseTestCase):
2047
2048 ALLOWED_TYPES = ('processes',)
2049
2050 def test_enable_logging(self):
2051 logger = multiprocessing.get_logger()
2052 logger.setLevel(util.SUBWARNING)
2053 self.assertTrue(logger is not None)
2054 logger.debug('this will not be printed')
2055 logger.info('nor will this')
2056 logger.setLevel(LOG_LEVEL)
2057
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002058 @classmethod
2059 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002060 logger = multiprocessing.get_logger()
2061 conn.send(logger.getEffectiveLevel())
2062
2063 def test_level(self):
2064 LEVEL1 = 32
2065 LEVEL2 = 37
2066
2067 logger = multiprocessing.get_logger()
2068 root_logger = logging.getLogger()
2069 root_level = root_logger.level
2070
2071 reader, writer = multiprocessing.Pipe(duplex=False)
2072
2073 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002074 p = self.Process(target=self._test_level, args=(writer,))
2075 p.daemon = True
2076 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002077 self.assertEqual(LEVEL1, reader.recv())
2078
2079 logger.setLevel(logging.NOTSET)
2080 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002081 p = self.Process(target=self._test_level, args=(writer,))
2082 p.daemon = True
2083 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002084 self.assertEqual(LEVEL2, reader.recv())
2085
2086 root_logger.setLevel(root_level)
2087 logger.setLevel(level=LOG_LEVEL)
2088
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002089
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002090# class _TestLoggingProcessName(BaseTestCase):
2091#
2092# def handle(self, record):
2093# assert record.processName == multiprocessing.current_process().name
2094# self.__handled = True
2095#
2096# def test_logging(self):
2097# handler = logging.Handler()
2098# handler.handle = self.handle
2099# self.__handled = False
2100# # Bypass getLogger() and side-effects
2101# logger = logging.getLoggerClass()(
2102# 'multiprocessing.test.TestLoggingProcessName')
2103# logger.addHandler(handler)
2104# logger.propagate = False
2105#
2106# logger.warn('foo')
2107# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002108
Benjamin Petersone711caf2008-06-11 16:44:04 +00002109#
Jesse Noller6214edd2009-01-19 16:23:53 +00002110# Test to verify handle verification, see issue 3321
2111#
2112
2113class TestInvalidHandle(unittest.TestCase):
2114
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002115 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002116 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002117 conn = _multiprocessing.Connection(44977608)
2118 self.assertRaises(IOError, conn.poll)
2119 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002120
Jesse Noller6214edd2009-01-19 16:23:53 +00002121#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002122# Functions used to create test cases from the base ones in this module
2123#
2124
2125def get_attributes(Source, names):
2126 d = {}
2127 for name in names:
2128 obj = getattr(Source, name)
2129 if type(obj) == type(get_attributes):
2130 obj = staticmethod(obj)
2131 d[name] = obj
2132 return d
2133
2134def create_test_cases(Mixin, type):
2135 result = {}
2136 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002137 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002138
2139 for name in list(glob.keys()):
2140 if name.startswith('_Test'):
2141 base = glob[name]
2142 if type in base.ALLOWED_TYPES:
2143 newname = 'With' + Type + name[1:]
2144 class Temp(base, unittest.TestCase, Mixin):
2145 pass
2146 result[newname] = Temp
2147 Temp.__name__ = newname
2148 Temp.__module__ = Mixin.__module__
2149 return result
2150
2151#
2152# Create test cases
2153#
2154
2155class ProcessesMixin(object):
2156 TYPE = 'processes'
2157 Process = multiprocessing.Process
2158 locals().update(get_attributes(multiprocessing, (
2159 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2160 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2161 'RawArray', 'current_process', 'active_children', 'Pipe',
2162 'connection', 'JoinableQueue'
2163 )))
2164
2165testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2166globals().update(testcases_processes)
2167
2168
2169class ManagerMixin(object):
2170 TYPE = 'manager'
2171 Process = multiprocessing.Process
2172 manager = object.__new__(multiprocessing.managers.SyncManager)
2173 locals().update(get_attributes(manager, (
2174 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2175 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2176 'Namespace', 'JoinableQueue'
2177 )))
2178
2179testcases_manager = create_test_cases(ManagerMixin, type='manager')
2180globals().update(testcases_manager)
2181
2182
2183class ThreadsMixin(object):
2184 TYPE = 'threads'
2185 Process = multiprocessing.dummy.Process
2186 locals().update(get_attributes(multiprocessing.dummy, (
2187 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2188 'Condition', 'Event', 'Value', 'Array', 'current_process',
2189 'active_children', 'Pipe', 'connection', 'dict', 'list',
2190 'Namespace', 'JoinableQueue'
2191 )))
2192
2193testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2194globals().update(testcases_threads)
2195
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002196class OtherTest(unittest.TestCase):
2197 # TODO: add more tests for deliver/answer challenge.
2198 def test_deliver_challenge_auth_failure(self):
2199 class _FakeConnection(object):
2200 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002201 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002202 def send_bytes(self, data):
2203 pass
2204 self.assertRaises(multiprocessing.AuthenticationError,
2205 multiprocessing.connection.deliver_challenge,
2206 _FakeConnection(), b'abc')
2207
2208 def test_answer_challenge_auth_failure(self):
2209 class _FakeConnection(object):
2210 def __init__(self):
2211 self.count = 0
2212 def recv_bytes(self, size):
2213 self.count += 1
2214 if self.count == 1:
2215 return multiprocessing.connection.CHALLENGE
2216 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002217 return b'something bogus'
2218 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002219 def send_bytes(self, data):
2220 pass
2221 self.assertRaises(multiprocessing.AuthenticationError,
2222 multiprocessing.connection.answer_challenge,
2223 _FakeConnection(), b'abc')
2224
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002225#
2226# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2227#
2228
2229def initializer(ns):
2230 ns.test += 1
2231
2232class TestInitializers(unittest.TestCase):
2233 def setUp(self):
2234 self.mgr = multiprocessing.Manager()
2235 self.ns = self.mgr.Namespace()
2236 self.ns.test = 0
2237
2238 def tearDown(self):
2239 self.mgr.shutdown()
2240
2241 def test_manager_initializer(self):
2242 m = multiprocessing.managers.SyncManager()
2243 self.assertRaises(TypeError, m.start, 1)
2244 m.start(initializer, (self.ns,))
2245 self.assertEqual(self.ns.test, 1)
2246 m.shutdown()
2247
2248 def test_pool_initializer(self):
2249 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2250 p = multiprocessing.Pool(1, initializer, (self.ns,))
2251 p.close()
2252 p.join()
2253 self.assertEqual(self.ns.test, 1)
2254
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002255#
2256# Issue 5155, 5313, 5331: Test process in processes
2257# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2258#
2259
2260def _ThisSubProcess(q):
2261 try:
2262 item = q.get(block=False)
2263 except pyqueue.Empty:
2264 pass
2265
2266def _TestProcess(q):
2267 queue = multiprocessing.Queue()
2268 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002269 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002270 subProc.start()
2271 subProc.join()
2272
2273def _afunc(x):
2274 return x*x
2275
2276def pool_in_process():
2277 pool = multiprocessing.Pool(processes=4)
2278 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2279
2280class _file_like(object):
2281 def __init__(self, delegate):
2282 self._delegate = delegate
2283 self._pid = None
2284
2285 @property
2286 def cache(self):
2287 pid = os.getpid()
2288 # There are no race conditions since fork keeps only the running thread
2289 if pid != self._pid:
2290 self._pid = pid
2291 self._cache = []
2292 return self._cache
2293
2294 def write(self, data):
2295 self.cache.append(data)
2296
2297 def flush(self):
2298 self._delegate.write(''.join(self.cache))
2299 self._cache = []
2300
2301class TestStdinBadfiledescriptor(unittest.TestCase):
2302
2303 def test_queue_in_process(self):
2304 queue = multiprocessing.Queue()
2305 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2306 proc.start()
2307 proc.join()
2308
2309 def test_pool_in_process(self):
2310 p = multiprocessing.Process(target=pool_in_process)
2311 p.start()
2312 p.join()
2313
2314 def test_flushing(self):
2315 sio = io.StringIO()
2316 flike = _file_like(sio)
2317 flike.write('foo')
2318 proc = multiprocessing.Process(target=lambda: flike.flush())
2319 flike.flush()
2320 assert sio.getvalue() == 'foo'
2321
2322testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2323 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002324
Benjamin Petersone711caf2008-06-11 16:44:04 +00002325#
2326#
2327#
2328
2329def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002330 if sys.platform.startswith("linux"):
2331 try:
2332 lock = multiprocessing.RLock()
2333 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002334 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002335
Charles-François Natali3be00952011-11-22 18:36:39 +01002336 check_enough_semaphores()
2337
Benjamin Petersone711caf2008-06-11 16:44:04 +00002338 if run is None:
2339 from test.support import run_unittest as run
2340
2341 util.get_temp_dir() # creates temp directory for use by all processes
2342
2343 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2344
Benjamin Peterson41181742008-07-02 20:22:54 +00002345 ProcessesMixin.pool = multiprocessing.Pool(4)
2346 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2347 ManagerMixin.manager.__init__()
2348 ManagerMixin.manager.start()
2349 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002350
2351 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002352 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2353 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002354 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2355 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002356 )
2357
2358 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2359 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2360 run(suite)
2361
Benjamin Peterson41181742008-07-02 20:22:54 +00002362 ThreadsMixin.pool.terminate()
2363 ProcessesMixin.pool.terminate()
2364 ManagerMixin.pool.terminate()
2365 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002366
Benjamin Peterson41181742008-07-02 20:22:54 +00002367 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002368
2369def main():
2370 test_main(unittest.TextTestRunner(verbosity=2).run)
2371
2372if __name__ == '__main__':
2373 main()