blob: de894f3087b254bcf6405d75bb4303e4216ba888 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalie51c8da2011-09-21 18:48:21 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
85
Antoine Pitroubcb39d42011-08-23 19:46:22 +020086try:
87 MAXFD = os.sysconf("SC_OPEN_MAX")
88except:
89 MAXFD = 256
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000092# Some tests require ctypes
93#
94
95try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000096 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000097except ImportError:
98 Structure = object
99 c_int = c_double = None
100
Charles-François Natali3be00952011-11-22 18:36:39 +0100101
102def check_enough_semaphores():
103 """Check that the system supports enough semaphores to run the test."""
104 # minimum number of semaphores available according to POSIX
105 nsems_min = 256
106 try:
107 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
108 except (AttributeError, ValueError):
109 # sysconf not available or setting not available
110 return
111 if nsems == -1 or nsems >= nsems_min:
112 return
113 raise unittest.SkipTest("The OS doesn't support enough semaphores "
114 "to run the test (required: %d)." % nsems_min)
115
116
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000117#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000118# Creates a wrapper for a function which records the time it takes to finish
119#
120
121class TimingWrapper(object):
122
123 def __init__(self, func):
124 self.func = func
125 self.elapsed = None
126
127 def __call__(self, *args, **kwds):
128 t = time.time()
129 try:
130 return self.func(*args, **kwds)
131 finally:
132 self.elapsed = time.time() - t
133
134#
135# Base class for test cases
136#
137
138class BaseTestCase(object):
139
140 ALLOWED_TYPES = ('processes', 'manager', 'threads')
141
142 def assertTimingAlmostEqual(self, a, b):
143 if CHECK_TIMINGS:
144 self.assertAlmostEqual(a, b, 1)
145
146 def assertReturnsIfImplemented(self, value, func, *args):
147 try:
148 res = func(*args)
149 except NotImplementedError:
150 pass
151 else:
152 return self.assertEqual(value, res)
153
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000154 # For the sanity of Windows users, rather than crashing or freezing in
155 # multiple ways.
156 def __reduce__(self, *args):
157 raise NotImplementedError("shouldn't try to pickle a test case")
158
159 __reduce_ex__ = __reduce__
160
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161#
162# Return the value of a semaphore
163#
164
165def get_value(self):
166 try:
167 return self.get_value()
168 except AttributeError:
169 try:
170 return self._Semaphore__value
171 except AttributeError:
172 try:
173 return self._value
174 except AttributeError:
175 raise NotImplementedError
176
177#
178# Testcases
179#
180
181class _TestProcess(BaseTestCase):
182
183 ALLOWED_TYPES = ('processes', 'threads')
184
185 def test_current(self):
186 if self.TYPE == 'threads':
187 return
188
189 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000193 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000194 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 self.assertEqual(current.ident, os.getpid())
197 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000199 @classmethod
200 def _test(cls, q, *args, **kwds):
201 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202 q.put(args)
203 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000205 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 q.put(current.pid)
208
209 def test_process(self):
210 q = self.Queue(1)
211 e = self.Event()
212 args = (q, 1, 2)
213 kwargs = {'hello':23, 'bye':2.54}
214 name = 'SomeProcess'
215 p = self.Process(
216 target=self._test, args=args, kwargs=kwargs, name=name
217 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000218 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219 current = self.current_process()
220
221 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000222 self.assertEqual(p.authkey, current.authkey)
223 self.assertEqual(p.is_alive(), False)
224 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000225 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 p.start()
230
Ezio Melottib3aedd42010-11-20 19:04:17 +0000231 self.assertEqual(p.exitcode, None)
232 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000233 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
Ezio Melottib3aedd42010-11-20 19:04:17 +0000235 self.assertEqual(q.get(), args[1:])
236 self.assertEqual(q.get(), kwargs)
237 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000239 self.assertEqual(q.get(), current.authkey)
240 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.join()
243
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.exitcode, 0)
245 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000248 @classmethod
249 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250 time.sleep(1000)
251
252 def test_terminate(self):
253 if self.TYPE == 'threads':
254 return
255
256 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 p.start()
259
260 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000262 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
264 p.terminate()
265
266 join = TimingWrapper(p.join)
267 self.assertEqual(join(), None)
268 self.assertTimingAlmostEqual(join.elapsed, 0.0)
269
270 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000271 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272
273 p.join()
274
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 # XXX sometimes get p.exitcode == 0 on Windows ...
276 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
278 def test_cpu_count(self):
279 try:
280 cpus = multiprocessing.cpu_count()
281 except NotImplementedError:
282 cpus = 1
283 self.assertTrue(type(cpus) is int)
284 self.assertTrue(cpus >= 1)
285
286 def test_active_children(self):
287 self.assertEqual(type(self.active_children()), list)
288
289 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291
Jesus Cea94f964f2011-09-09 20:26:57 +0200292 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000294 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295
296 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000297 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000299 @classmethod
300 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 from multiprocessing import forking
302 wconn.send(id)
303 if len(id) < 2:
304 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000305 p = cls.Process(
306 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307 )
308 p.start()
309 p.join()
310
311 def test_recursion(self):
312 rconn, wconn = self.Pipe(duplex=False)
313 self._test_recursion(wconn, [])
314
315 time.sleep(DELTA)
316 result = []
317 while rconn.poll():
318 result.append(rconn.recv())
319
320 expected = [
321 [],
322 [0],
323 [0, 0],
324 [0, 1],
325 [1],
326 [1, 0],
327 [1, 1]
328 ]
329 self.assertEqual(result, expected)
330
331#
332#
333#
334
335class _UpperCaser(multiprocessing.Process):
336
337 def __init__(self):
338 multiprocessing.Process.__init__(self)
339 self.child_conn, self.parent_conn = multiprocessing.Pipe()
340
341 def run(self):
342 self.parent_conn.close()
343 for s in iter(self.child_conn.recv, None):
344 self.child_conn.send(s.upper())
345 self.child_conn.close()
346
347 def submit(self, s):
348 assert type(s) is str
349 self.parent_conn.send(s)
350 return self.parent_conn.recv()
351
352 def stop(self):
353 self.parent_conn.send(None)
354 self.parent_conn.close()
355 self.child_conn.close()
356
357class _TestSubclassingProcess(BaseTestCase):
358
359 ALLOWED_TYPES = ('processes',)
360
361 def test_subclassing(self):
362 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200363 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364 uppercaser.start()
365 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
366 self.assertEqual(uppercaser.submit('world'), 'WORLD')
367 uppercaser.stop()
368 uppercaser.join()
369
370#
371#
372#
373
374def queue_empty(q):
375 if hasattr(q, 'empty'):
376 return q.empty()
377 else:
378 return q.qsize() == 0
379
380def queue_full(q, maxsize):
381 if hasattr(q, 'full'):
382 return q.full()
383 else:
384 return q.qsize() == maxsize
385
386
387class _TestQueue(BaseTestCase):
388
389
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000390 @classmethod
391 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000392 child_can_start.wait()
393 for i in range(6):
394 queue.get()
395 parent_can_continue.set()
396
397 def test_put(self):
398 MAXSIZE = 6
399 queue = self.Queue(maxsize=MAXSIZE)
400 child_can_start = self.Event()
401 parent_can_continue = self.Event()
402
403 proc = self.Process(
404 target=self._test_put,
405 args=(queue, child_can_start, parent_can_continue)
406 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000407 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000408 proc.start()
409
410 self.assertEqual(queue_empty(queue), True)
411 self.assertEqual(queue_full(queue, MAXSIZE), False)
412
413 queue.put(1)
414 queue.put(2, True)
415 queue.put(3, True, None)
416 queue.put(4, False)
417 queue.put(5, False, None)
418 queue.put_nowait(6)
419
420 # the values may be in buffer but not yet in pipe so sleep a bit
421 time.sleep(DELTA)
422
423 self.assertEqual(queue_empty(queue), False)
424 self.assertEqual(queue_full(queue, MAXSIZE), True)
425
426 put = TimingWrapper(queue.put)
427 put_nowait = TimingWrapper(queue.put_nowait)
428
429 self.assertRaises(pyqueue.Full, put, 7, False)
430 self.assertTimingAlmostEqual(put.elapsed, 0)
431
432 self.assertRaises(pyqueue.Full, put, 7, False, None)
433 self.assertTimingAlmostEqual(put.elapsed, 0)
434
435 self.assertRaises(pyqueue.Full, put_nowait, 7)
436 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
437
438 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
439 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
440
441 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
442 self.assertTimingAlmostEqual(put.elapsed, 0)
443
444 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
445 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
446
447 child_can_start.set()
448 parent_can_continue.wait()
449
450 self.assertEqual(queue_empty(queue), True)
451 self.assertEqual(queue_full(queue, MAXSIZE), False)
452
453 proc.join()
454
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000455 @classmethod
456 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000457 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000458 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000459 queue.put(2)
460 queue.put(3)
461 queue.put(4)
462 queue.put(5)
463 parent_can_continue.set()
464
465 def test_get(self):
466 queue = self.Queue()
467 child_can_start = self.Event()
468 parent_can_continue = self.Event()
469
470 proc = self.Process(
471 target=self._test_get,
472 args=(queue, child_can_start, parent_can_continue)
473 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000474 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475 proc.start()
476
477 self.assertEqual(queue_empty(queue), True)
478
479 child_can_start.set()
480 parent_can_continue.wait()
481
482 time.sleep(DELTA)
483 self.assertEqual(queue_empty(queue), False)
484
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000485 # Hangs unexpectedly, remove for now
486 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000487 self.assertEqual(queue.get(True, None), 2)
488 self.assertEqual(queue.get(True), 3)
489 self.assertEqual(queue.get(timeout=1), 4)
490 self.assertEqual(queue.get_nowait(), 5)
491
492 self.assertEqual(queue_empty(queue), True)
493
494 get = TimingWrapper(queue.get)
495 get_nowait = TimingWrapper(queue.get_nowait)
496
497 self.assertRaises(pyqueue.Empty, get, False)
498 self.assertTimingAlmostEqual(get.elapsed, 0)
499
500 self.assertRaises(pyqueue.Empty, get, False, None)
501 self.assertTimingAlmostEqual(get.elapsed, 0)
502
503 self.assertRaises(pyqueue.Empty, get_nowait)
504 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
505
506 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
507 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
508
509 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
510 self.assertTimingAlmostEqual(get.elapsed, 0)
511
512 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
513 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
514
515 proc.join()
516
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000517 @classmethod
518 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000519 for i in range(10, 20):
520 queue.put(i)
521 # note that at this point the items may only be buffered, so the
522 # process cannot shutdown until the feeder thread has finished
523 # pushing items onto the pipe.
524
525 def test_fork(self):
526 # Old versions of Queue would fail to create a new feeder
527 # thread for a forked process if the original process had its
528 # own feeder thread. This test checks that this no longer
529 # happens.
530
531 queue = self.Queue()
532
533 # put items on queue so that main process starts a feeder thread
534 for i in range(10):
535 queue.put(i)
536
537 # wait to make sure thread starts before we fork a new process
538 time.sleep(DELTA)
539
540 # fork process
541 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200542 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543 p.start()
544
545 # check that all expected items are in the queue
546 for i in range(20):
547 self.assertEqual(queue.get(), i)
548 self.assertRaises(pyqueue.Empty, queue.get, False)
549
550 p.join()
551
552 def test_qsize(self):
553 q = self.Queue()
554 try:
555 self.assertEqual(q.qsize(), 0)
556 except NotImplementedError:
557 return
558 q.put(1)
559 self.assertEqual(q.qsize(), 1)
560 q.put(5)
561 self.assertEqual(q.qsize(), 2)
562 q.get()
563 self.assertEqual(q.qsize(), 1)
564 q.get()
565 self.assertEqual(q.qsize(), 0)
566
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000567 @classmethod
568 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000569 for obj in iter(q.get, None):
570 time.sleep(DELTA)
571 q.task_done()
572
573 def test_task_done(self):
574 queue = self.JoinableQueue()
575
576 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000577 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000578
579 workers = [self.Process(target=self._test_task_done, args=(queue,))
580 for i in range(4)]
581
582 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200583 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000584 p.start()
585
586 for i in range(10):
587 queue.put(i)
588
589 queue.join()
590
591 for p in workers:
592 queue.put(None)
593
594 for p in workers:
595 p.join()
596
597#
598#
599#
600
601class _TestLock(BaseTestCase):
602
603 def test_lock(self):
604 lock = self.Lock()
605 self.assertEqual(lock.acquire(), True)
606 self.assertEqual(lock.acquire(False), False)
607 self.assertEqual(lock.release(), None)
608 self.assertRaises((ValueError, threading.ThreadError), lock.release)
609
610 def test_rlock(self):
611 lock = self.RLock()
612 self.assertEqual(lock.acquire(), True)
613 self.assertEqual(lock.acquire(), True)
614 self.assertEqual(lock.acquire(), True)
615 self.assertEqual(lock.release(), None)
616 self.assertEqual(lock.release(), None)
617 self.assertEqual(lock.release(), None)
618 self.assertRaises((AssertionError, RuntimeError), lock.release)
619
Jesse Nollerf8d00852009-03-31 03:25:07 +0000620 def test_lock_context(self):
621 with self.Lock():
622 pass
623
Benjamin Petersone711caf2008-06-11 16:44:04 +0000624
625class _TestSemaphore(BaseTestCase):
626
627 def _test_semaphore(self, sem):
628 self.assertReturnsIfImplemented(2, get_value, sem)
629 self.assertEqual(sem.acquire(), True)
630 self.assertReturnsIfImplemented(1, get_value, sem)
631 self.assertEqual(sem.acquire(), True)
632 self.assertReturnsIfImplemented(0, get_value, sem)
633 self.assertEqual(sem.acquire(False), False)
634 self.assertReturnsIfImplemented(0, get_value, sem)
635 self.assertEqual(sem.release(), None)
636 self.assertReturnsIfImplemented(1, get_value, sem)
637 self.assertEqual(sem.release(), None)
638 self.assertReturnsIfImplemented(2, get_value, sem)
639
640 def test_semaphore(self):
641 sem = self.Semaphore(2)
642 self._test_semaphore(sem)
643 self.assertEqual(sem.release(), None)
644 self.assertReturnsIfImplemented(3, get_value, sem)
645 self.assertEqual(sem.release(), None)
646 self.assertReturnsIfImplemented(4, get_value, sem)
647
648 def test_bounded_semaphore(self):
649 sem = self.BoundedSemaphore(2)
650 self._test_semaphore(sem)
651 # Currently fails on OS/X
652 #if HAVE_GETVALUE:
653 # self.assertRaises(ValueError, sem.release)
654 # self.assertReturnsIfImplemented(2, get_value, sem)
655
656 def test_timeout(self):
657 if self.TYPE != 'processes':
658 return
659
660 sem = self.Semaphore(0)
661 acquire = TimingWrapper(sem.acquire)
662
663 self.assertEqual(acquire(False), False)
664 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
665
666 self.assertEqual(acquire(False, None), False)
667 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
668
669 self.assertEqual(acquire(False, TIMEOUT1), False)
670 self.assertTimingAlmostEqual(acquire.elapsed, 0)
671
672 self.assertEqual(acquire(True, TIMEOUT2), False)
673 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
674
675 self.assertEqual(acquire(timeout=TIMEOUT3), False)
676 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
677
678
679class _TestCondition(BaseTestCase):
680
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000681 @classmethod
682 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683 cond.acquire()
684 sleeping.release()
685 cond.wait(timeout)
686 woken.release()
687 cond.release()
688
689 def check_invariant(self, cond):
690 # this is only supposed to succeed when there are no sleepers
691 if self.TYPE == 'processes':
692 try:
693 sleepers = (cond._sleeping_count.get_value() -
694 cond._woken_count.get_value())
695 self.assertEqual(sleepers, 0)
696 self.assertEqual(cond._wait_semaphore.get_value(), 0)
697 except NotImplementedError:
698 pass
699
700 def test_notify(self):
701 cond = self.Condition()
702 sleeping = self.Semaphore(0)
703 woken = self.Semaphore(0)
704
705 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000706 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000707 p.start()
708
709 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000710 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000711 p.start()
712
713 # wait for both children to start sleeping
714 sleeping.acquire()
715 sleeping.acquire()
716
717 # check no process/thread has woken up
718 time.sleep(DELTA)
719 self.assertReturnsIfImplemented(0, get_value, woken)
720
721 # wake up one process/thread
722 cond.acquire()
723 cond.notify()
724 cond.release()
725
726 # check one process/thread has woken up
727 time.sleep(DELTA)
728 self.assertReturnsIfImplemented(1, get_value, woken)
729
730 # wake up another
731 cond.acquire()
732 cond.notify()
733 cond.release()
734
735 # check other has woken up
736 time.sleep(DELTA)
737 self.assertReturnsIfImplemented(2, get_value, woken)
738
739 # check state is not mucked up
740 self.check_invariant(cond)
741 p.join()
742
743 def test_notify_all(self):
744 cond = self.Condition()
745 sleeping = self.Semaphore(0)
746 woken = self.Semaphore(0)
747
748 # start some threads/processes which will timeout
749 for i in range(3):
750 p = self.Process(target=self.f,
751 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000752 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000753 p.start()
754
755 t = threading.Thread(target=self.f,
756 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000757 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000758 t.start()
759
760 # wait for them all to sleep
761 for i in range(6):
762 sleeping.acquire()
763
764 # check they have all timed out
765 for i in range(6):
766 woken.acquire()
767 self.assertReturnsIfImplemented(0, get_value, woken)
768
769 # check state is not mucked up
770 self.check_invariant(cond)
771
772 # start some more threads/processes
773 for i in range(3):
774 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000775 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 p.start()
777
778 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000779 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 t.start()
781
782 # wait for them to all sleep
783 for i in range(6):
784 sleeping.acquire()
785
786 # check no process/thread has woken up
787 time.sleep(DELTA)
788 self.assertReturnsIfImplemented(0, get_value, woken)
789
790 # wake them all up
791 cond.acquire()
792 cond.notify_all()
793 cond.release()
794
795 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200796 for i in range(10):
797 try:
798 if get_value(woken) == 6:
799 break
800 except NotImplementedError:
801 break
802 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 self.assertReturnsIfImplemented(6, get_value, woken)
804
805 # check state is not mucked up
806 self.check_invariant(cond)
807
808 def test_timeout(self):
809 cond = self.Condition()
810 wait = TimingWrapper(cond.wait)
811 cond.acquire()
812 res = wait(TIMEOUT1)
813 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000814 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000815 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
816
817
818class _TestEvent(BaseTestCase):
819
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000820 @classmethod
821 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 time.sleep(TIMEOUT2)
823 event.set()
824
825 def test_event(self):
826 event = self.Event()
827 wait = TimingWrapper(event.wait)
828
Ezio Melotti13925002011-03-16 11:05:33 +0200829 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000831 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000832
Benjamin Peterson965ce872009-04-05 21:24:58 +0000833 # Removed, threading.Event.wait() will return the value of the __flag
834 # instead of None. API Shear with the semaphore backed mp.Event
835 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000837 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
839
840 event.set()
841
842 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000843 self.assertEqual(event.is_set(), True)
844 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000846 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
848 # self.assertEqual(event.is_set(), True)
849
850 event.clear()
851
852 #self.assertEqual(event.is_set(), False)
853
Jesus Cea94f964f2011-09-09 20:26:57 +0200854 p = self.Process(target=self._test_event, args=(event,))
855 p.daemon = True
856 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000857 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858
859#
860#
861#
862
863class _TestValue(BaseTestCase):
864
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000865 ALLOWED_TYPES = ('processes',)
866
Benjamin Petersone711caf2008-06-11 16:44:04 +0000867 codes_values = [
868 ('i', 4343, 24234),
869 ('d', 3.625, -4.25),
870 ('h', -232, 234),
871 ('c', latin('x'), latin('y'))
872 ]
873
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000874 def setUp(self):
875 if not HAS_SHAREDCTYPES:
876 self.skipTest("requires multiprocessing.sharedctypes")
877
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000878 @classmethod
879 def _test(cls, values):
880 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 sv.value = cv[2]
882
883
884 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885 if raw:
886 values = [self.RawValue(code, value)
887 for code, value, _ in self.codes_values]
888 else:
889 values = [self.Value(code, value)
890 for code, value, _ in self.codes_values]
891
892 for sv, cv in zip(values, self.codes_values):
893 self.assertEqual(sv.value, cv[1])
894
895 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200896 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 proc.start()
898 proc.join()
899
900 for sv, cv in zip(values, self.codes_values):
901 self.assertEqual(sv.value, cv[2])
902
903 def test_rawvalue(self):
904 self.test_value(raw=True)
905
906 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000907 val1 = self.Value('i', 5)
908 lock1 = val1.get_lock()
909 obj1 = val1.get_obj()
910
911 val2 = self.Value('i', 5, lock=None)
912 lock2 = val2.get_lock()
913 obj2 = val2.get_obj()
914
915 lock = self.Lock()
916 val3 = self.Value('i', 5, lock=lock)
917 lock3 = val3.get_lock()
918 obj3 = val3.get_obj()
919 self.assertEqual(lock, lock3)
920
Jesse Nollerb0516a62009-01-18 03:11:38 +0000921 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000922 self.assertFalse(hasattr(arr4, 'get_lock'))
923 self.assertFalse(hasattr(arr4, 'get_obj'))
924
Jesse Nollerb0516a62009-01-18 03:11:38 +0000925 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
926
927 arr5 = self.RawValue('i', 5)
928 self.assertFalse(hasattr(arr5, 'get_lock'))
929 self.assertFalse(hasattr(arr5, 'get_obj'))
930
Benjamin Petersone711caf2008-06-11 16:44:04 +0000931
932class _TestArray(BaseTestCase):
933
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000934 ALLOWED_TYPES = ('processes',)
935
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000936 @classmethod
937 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 for i in range(1, len(seq)):
939 seq[i] += seq[i-1]
940
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000941 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000943 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
944 if raw:
945 arr = self.RawArray('i', seq)
946 else:
947 arr = self.Array('i', seq)
948
949 self.assertEqual(len(arr), len(seq))
950 self.assertEqual(arr[3], seq[3])
951 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
952
953 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
954
955 self.assertEqual(list(arr[:]), seq)
956
957 self.f(seq)
958
959 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200960 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961 p.start()
962 p.join()
963
964 self.assertEqual(list(arr[:]), seq)
965
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000966 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000967 def test_array_from_size(self):
968 size = 10
969 # Test for zeroing (see issue #11675).
970 # The repetition below strengthens the test by increasing the chances
971 # of previously allocated non-zero memory being used for the new array
972 # on the 2nd and 3rd loops.
973 for _ in range(3):
974 arr = self.Array('i', size)
975 self.assertEqual(len(arr), size)
976 self.assertEqual(list(arr), [0] * size)
977 arr[:] = range(10)
978 self.assertEqual(list(arr), list(range(10)))
979 del arr
980
981 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000982 def test_rawarray(self):
983 self.test_array(raw=True)
984
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000985 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000986 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000987 arr1 = self.Array('i', list(range(10)))
988 lock1 = arr1.get_lock()
989 obj1 = arr1.get_obj()
990
991 arr2 = self.Array('i', list(range(10)), lock=None)
992 lock2 = arr2.get_lock()
993 obj2 = arr2.get_obj()
994
995 lock = self.Lock()
996 arr3 = self.Array('i', list(range(10)), lock=lock)
997 lock3 = arr3.get_lock()
998 obj3 = arr3.get_obj()
999 self.assertEqual(lock, lock3)
1000
Jesse Nollerb0516a62009-01-18 03:11:38 +00001001 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001002 self.assertFalse(hasattr(arr4, 'get_lock'))
1003 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001004 self.assertRaises(AttributeError,
1005 self.Array, 'i', range(10), lock='notalock')
1006
1007 arr5 = self.RawArray('i', range(10))
1008 self.assertFalse(hasattr(arr5, 'get_lock'))
1009 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010
1011#
1012#
1013#
1014
1015class _TestContainers(BaseTestCase):
1016
1017 ALLOWED_TYPES = ('manager',)
1018
1019 def test_list(self):
1020 a = self.list(list(range(10)))
1021 self.assertEqual(a[:], list(range(10)))
1022
1023 b = self.list()
1024 self.assertEqual(b[:], [])
1025
1026 b.extend(list(range(5)))
1027 self.assertEqual(b[:], list(range(5)))
1028
1029 self.assertEqual(b[2], 2)
1030 self.assertEqual(b[2:10], [2,3,4])
1031
1032 b *= 2
1033 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1034
1035 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1036
1037 self.assertEqual(a[:], list(range(10)))
1038
1039 d = [a, b]
1040 e = self.list(d)
1041 self.assertEqual(
1042 e[:],
1043 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1044 )
1045
1046 f = self.list([a])
1047 a.append('hello')
1048 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1049
1050 def test_dict(self):
1051 d = self.dict()
1052 indices = list(range(65, 70))
1053 for i in indices:
1054 d[i] = chr(i)
1055 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1056 self.assertEqual(sorted(d.keys()), indices)
1057 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1058 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1059
1060 def test_namespace(self):
1061 n = self.Namespace()
1062 n.name = 'Bob'
1063 n.job = 'Builder'
1064 n._hidden = 'hidden'
1065 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1066 del n.job
1067 self.assertEqual(str(n), "Namespace(name='Bob')")
1068 self.assertTrue(hasattr(n, 'name'))
1069 self.assertTrue(not hasattr(n, 'job'))
1070
1071#
1072#
1073#
1074
1075def sqr(x, wait=0.0):
1076 time.sleep(wait)
1077 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001078
Benjamin Petersone711caf2008-06-11 16:44:04 +00001079class _TestPool(BaseTestCase):
1080
1081 def test_apply(self):
1082 papply = self.pool.apply
1083 self.assertEqual(papply(sqr, (5,)), sqr(5))
1084 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1085
1086 def test_map(self):
1087 pmap = self.pool.map
1088 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1089 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1090 list(map(sqr, list(range(100)))))
1091
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001092 def test_map_chunksize(self):
1093 try:
1094 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1095 except multiprocessing.TimeoutError:
1096 self.fail("pool.map_async with chunksize stalled on null list")
1097
Benjamin Petersone711caf2008-06-11 16:44:04 +00001098 def test_async(self):
1099 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1100 get = TimingWrapper(res.get)
1101 self.assertEqual(get(), 49)
1102 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1103
1104 def test_async_timeout(self):
1105 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1106 get = TimingWrapper(res.get)
1107 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1108 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1109
1110 def test_imap(self):
1111 it = self.pool.imap(sqr, list(range(10)))
1112 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1113
1114 it = self.pool.imap(sqr, list(range(10)))
1115 for i in range(10):
1116 self.assertEqual(next(it), i*i)
1117 self.assertRaises(StopIteration, it.__next__)
1118
1119 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1120 for i in range(1000):
1121 self.assertEqual(next(it), i*i)
1122 self.assertRaises(StopIteration, it.__next__)
1123
1124 def test_imap_unordered(self):
1125 it = self.pool.imap_unordered(sqr, list(range(1000)))
1126 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1127
1128 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1129 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1130
1131 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001132 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1133 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1134
Benjamin Petersone711caf2008-06-11 16:44:04 +00001135 p = multiprocessing.Pool(3)
1136 self.assertEqual(3, len(p._pool))
1137 p.close()
1138 p.join()
1139
1140 def test_terminate(self):
1141 if self.TYPE == 'manager':
1142 # On Unix a forked process increfs each shared object to
1143 # which its parent process held a reference. If the
1144 # forked process gets terminated then there is likely to
1145 # be a reference leak. So to prevent
1146 # _TestZZZNumberOfObjects from failing we skip this test
1147 # when using a manager.
1148 return
1149
1150 result = self.pool.map_async(
1151 time.sleep, [0.1 for i in range(10000)], chunksize=1
1152 )
1153 self.pool.terminate()
1154 join = TimingWrapper(self.pool.join)
1155 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001156 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001157
Ask Solem2afcbf22010-11-09 20:55:52 +00001158def raising():
1159 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001160
Ask Solem2afcbf22010-11-09 20:55:52 +00001161def unpickleable_result():
1162 return lambda: 42
1163
1164class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001165 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001166
1167 def test_async_error_callback(self):
1168 p = multiprocessing.Pool(2)
1169
1170 scratchpad = [None]
1171 def errback(exc):
1172 scratchpad[0] = exc
1173
1174 res = p.apply_async(raising, error_callback=errback)
1175 self.assertRaises(KeyError, res.get)
1176 self.assertTrue(scratchpad[0])
1177 self.assertIsInstance(scratchpad[0], KeyError)
1178
1179 p.close()
1180 p.join()
1181
1182 def test_unpickleable_result(self):
1183 from multiprocessing.pool import MaybeEncodingError
1184 p = multiprocessing.Pool(2)
1185
1186 # Make sure we don't lose pool processes because of encoding errors.
1187 for iteration in range(20):
1188
1189 scratchpad = [None]
1190 def errback(exc):
1191 scratchpad[0] = exc
1192
1193 res = p.apply_async(unpickleable_result, error_callback=errback)
1194 self.assertRaises(MaybeEncodingError, res.get)
1195 wrapped = scratchpad[0]
1196 self.assertTrue(wrapped)
1197 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1198 self.assertIsNotNone(wrapped.exc)
1199 self.assertIsNotNone(wrapped.value)
1200
1201 p.close()
1202 p.join()
1203
1204class _TestPoolWorkerLifetime(BaseTestCase):
1205 ALLOWED_TYPES = ('processes', )
1206
Jesse Noller1f0b6582010-01-27 03:36:01 +00001207 def test_pool_worker_lifetime(self):
1208 p = multiprocessing.Pool(3, maxtasksperchild=10)
1209 self.assertEqual(3, len(p._pool))
1210 origworkerpids = [w.pid for w in p._pool]
1211 # Run many tasks so each worker gets replaced (hopefully)
1212 results = []
1213 for i in range(100):
1214 results.append(p.apply_async(sqr, (i, )))
1215 # Fetch the results and verify we got the right answers,
1216 # also ensuring all the tasks have completed.
1217 for (j, res) in enumerate(results):
1218 self.assertEqual(res.get(), sqr(j))
1219 # Refill the pool
1220 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001221 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001222 # (countdown * DELTA = 5 seconds max startup process time)
1223 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001224 while countdown and not all(w.is_alive() for w in p._pool):
1225 countdown -= 1
1226 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001227 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001228 # All pids should be assigned. See issue #7805.
1229 self.assertNotIn(None, origworkerpids)
1230 self.assertNotIn(None, finalworkerpids)
1231 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001232 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1233 p.close()
1234 p.join()
1235
Charles-François Natalif8859e12011-10-24 18:45:29 +02001236 def test_pool_worker_lifetime_early_close(self):
1237 # Issue #10332: closing a pool whose workers have limited lifetimes
1238 # before all the tasks completed would make join() hang.
1239 p = multiprocessing.Pool(3, maxtasksperchild=1)
1240 results = []
1241 for i in range(6):
1242 results.append(p.apply_async(sqr, (i, 0.3)))
1243 p.close()
1244 p.join()
1245 # check the results
1246 for (j, res) in enumerate(results):
1247 self.assertEqual(res.get(), sqr(j))
1248
1249
Benjamin Petersone711caf2008-06-11 16:44:04 +00001250#
1251# Test that manager has expected number of shared objects left
1252#
1253
1254class _TestZZZNumberOfObjects(BaseTestCase):
1255 # Because test cases are sorted alphabetically, this one will get
1256 # run after all the other tests for the manager. It tests that
1257 # there have been no "reference leaks" for the manager's shared
1258 # objects. Note the comment in _TestPool.test_terminate().
1259 ALLOWED_TYPES = ('manager',)
1260
1261 def test_number_of_objects(self):
1262 EXPECTED_NUMBER = 1 # the pool object is still alive
1263 multiprocessing.active_children() # discard dead process objs
1264 gc.collect() # do garbage collection
1265 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001266 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001267 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001268 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001269 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001270
1271 self.assertEqual(refs, EXPECTED_NUMBER)
1272
1273#
1274# Test of creating a customized manager class
1275#
1276
1277from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1278
1279class FooBar(object):
1280 def f(self):
1281 return 'f()'
1282 def g(self):
1283 raise ValueError
1284 def _h(self):
1285 return '_h()'
1286
1287def baz():
1288 for i in range(10):
1289 yield i*i
1290
1291class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001292 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001293 def __iter__(self):
1294 return self
1295 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001296 return self._callmethod('__next__')
1297
1298class MyManager(BaseManager):
1299 pass
1300
1301MyManager.register('Foo', callable=FooBar)
1302MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1303MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1304
1305
1306class _TestMyManager(BaseTestCase):
1307
1308 ALLOWED_TYPES = ('manager',)
1309
1310 def test_mymanager(self):
1311 manager = MyManager()
1312 manager.start()
1313
1314 foo = manager.Foo()
1315 bar = manager.Bar()
1316 baz = manager.baz()
1317
1318 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1319 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1320
1321 self.assertEqual(foo_methods, ['f', 'g'])
1322 self.assertEqual(bar_methods, ['f', '_h'])
1323
1324 self.assertEqual(foo.f(), 'f()')
1325 self.assertRaises(ValueError, foo.g)
1326 self.assertEqual(foo._callmethod('f'), 'f()')
1327 self.assertRaises(RemoteError, foo._callmethod, '_h')
1328
1329 self.assertEqual(bar.f(), 'f()')
1330 self.assertEqual(bar._h(), '_h()')
1331 self.assertEqual(bar._callmethod('f'), 'f()')
1332 self.assertEqual(bar._callmethod('_h'), '_h()')
1333
1334 self.assertEqual(list(baz), [i*i for i in range(10)])
1335
1336 manager.shutdown()
1337
1338#
1339# Test of connecting to a remote server and using xmlrpclib for serialization
1340#
1341
1342_queue = pyqueue.Queue()
1343def get_queue():
1344 return _queue
1345
1346class QueueManager(BaseManager):
1347 '''manager class used by server process'''
1348QueueManager.register('get_queue', callable=get_queue)
1349
1350class QueueManager2(BaseManager):
1351 '''manager class which specifies the same interface as QueueManager'''
1352QueueManager2.register('get_queue')
1353
1354
1355SERIALIZER = 'xmlrpclib'
1356
1357class _TestRemoteManager(BaseTestCase):
1358
1359 ALLOWED_TYPES = ('manager',)
1360
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001361 @classmethod
1362 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001363 manager = QueueManager2(
1364 address=address, authkey=authkey, serializer=SERIALIZER
1365 )
1366 manager.connect()
1367 queue = manager.get_queue()
1368 queue.put(('hello world', None, True, 2.25))
1369
1370 def test_remote(self):
1371 authkey = os.urandom(32)
1372
1373 manager = QueueManager(
1374 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1375 )
1376 manager.start()
1377
1378 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001379 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001380 p.start()
1381
1382 manager2 = QueueManager2(
1383 address=manager.address, authkey=authkey, serializer=SERIALIZER
1384 )
1385 manager2.connect()
1386 queue = manager2.get_queue()
1387
1388 # Note that xmlrpclib will deserialize object as a list not a tuple
1389 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1390
1391 # Because we are using xmlrpclib for serialization instead of
1392 # pickle this will cause a serialization error.
1393 self.assertRaises(Exception, queue.put, time.sleep)
1394
1395 # Make queue finalizer run before the server is stopped
1396 del queue
1397 manager.shutdown()
1398
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001399class _TestManagerRestart(BaseTestCase):
1400
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001401 @classmethod
1402 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001403 manager = QueueManager(
1404 address=address, authkey=authkey, serializer=SERIALIZER)
1405 manager.connect()
1406 queue = manager.get_queue()
1407 queue.put('hello world')
1408
1409 def test_rapid_restart(self):
1410 authkey = os.urandom(32)
1411 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001412 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001413 srvr = manager.get_server()
1414 addr = srvr.address
1415 # Close the connection.Listener socket which gets opened as a part
1416 # of manager.get_server(). It's not needed for the test.
1417 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001418 manager.start()
1419
1420 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001421 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001422 p.start()
1423 queue = manager.get_queue()
1424 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001425 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001426 manager.shutdown()
1427 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001428 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001429 try:
1430 manager.start()
1431 except IOError as e:
1432 if e.errno != errno.EADDRINUSE:
1433 raise
1434 # Retry after some time, in case the old socket was lingering
1435 # (sporadic failure on buildbots)
1436 time.sleep(1.0)
1437 manager = QueueManager(
1438 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001439 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001440
Benjamin Petersone711caf2008-06-11 16:44:04 +00001441#
1442#
1443#
1444
1445SENTINEL = latin('')
1446
1447class _TestConnection(BaseTestCase):
1448
1449 ALLOWED_TYPES = ('processes', 'threads')
1450
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001451 @classmethod
1452 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 for msg in iter(conn.recv_bytes, SENTINEL):
1454 conn.send_bytes(msg)
1455 conn.close()
1456
1457 def test_connection(self):
1458 conn, child_conn = self.Pipe()
1459
1460 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001461 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001462 p.start()
1463
1464 seq = [1, 2.25, None]
1465 msg = latin('hello world')
1466 longmsg = msg * 10
1467 arr = array.array('i', list(range(4)))
1468
1469 if self.TYPE == 'processes':
1470 self.assertEqual(type(conn.fileno()), int)
1471
1472 self.assertEqual(conn.send(seq), None)
1473 self.assertEqual(conn.recv(), seq)
1474
1475 self.assertEqual(conn.send_bytes(msg), None)
1476 self.assertEqual(conn.recv_bytes(), msg)
1477
1478 if self.TYPE == 'processes':
1479 buffer = array.array('i', [0]*10)
1480 expected = list(arr) + [0] * (10 - len(arr))
1481 self.assertEqual(conn.send_bytes(arr), None)
1482 self.assertEqual(conn.recv_bytes_into(buffer),
1483 len(arr) * buffer.itemsize)
1484 self.assertEqual(list(buffer), expected)
1485
1486 buffer = array.array('i', [0]*10)
1487 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1488 self.assertEqual(conn.send_bytes(arr), None)
1489 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1490 len(arr) * buffer.itemsize)
1491 self.assertEqual(list(buffer), expected)
1492
1493 buffer = bytearray(latin(' ' * 40))
1494 self.assertEqual(conn.send_bytes(longmsg), None)
1495 try:
1496 res = conn.recv_bytes_into(buffer)
1497 except multiprocessing.BufferTooShort as e:
1498 self.assertEqual(e.args, (longmsg,))
1499 else:
1500 self.fail('expected BufferTooShort, got %s' % res)
1501
1502 poll = TimingWrapper(conn.poll)
1503
1504 self.assertEqual(poll(), False)
1505 self.assertTimingAlmostEqual(poll.elapsed, 0)
1506
1507 self.assertEqual(poll(TIMEOUT1), False)
1508 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1509
1510 conn.send(None)
1511
1512 self.assertEqual(poll(TIMEOUT1), True)
1513 self.assertTimingAlmostEqual(poll.elapsed, 0)
1514
1515 self.assertEqual(conn.recv(), None)
1516
1517 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1518 conn.send_bytes(really_big_msg)
1519 self.assertEqual(conn.recv_bytes(), really_big_msg)
1520
1521 conn.send_bytes(SENTINEL) # tell child to quit
1522 child_conn.close()
1523
1524 if self.TYPE == 'processes':
1525 self.assertEqual(conn.readable, True)
1526 self.assertEqual(conn.writable, True)
1527 self.assertRaises(EOFError, conn.recv)
1528 self.assertRaises(EOFError, conn.recv_bytes)
1529
1530 p.join()
1531
1532 def test_duplex_false(self):
1533 reader, writer = self.Pipe(duplex=False)
1534 self.assertEqual(writer.send(1), None)
1535 self.assertEqual(reader.recv(), 1)
1536 if self.TYPE == 'processes':
1537 self.assertEqual(reader.readable, True)
1538 self.assertEqual(reader.writable, False)
1539 self.assertEqual(writer.readable, False)
1540 self.assertEqual(writer.writable, True)
1541 self.assertRaises(IOError, reader.send, 2)
1542 self.assertRaises(IOError, writer.recv)
1543 self.assertRaises(IOError, writer.poll)
1544
1545 def test_spawn_close(self):
1546 # We test that a pipe connection can be closed by parent
1547 # process immediately after child is spawned. On Windows this
1548 # would have sometimes failed on old versions because
1549 # child_conn would be closed before the child got a chance to
1550 # duplicate it.
1551 conn, child_conn = self.Pipe()
1552
1553 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001554 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001555 p.start()
1556 child_conn.close() # this might complete before child initializes
1557
1558 msg = latin('hello')
1559 conn.send_bytes(msg)
1560 self.assertEqual(conn.recv_bytes(), msg)
1561
1562 conn.send_bytes(SENTINEL)
1563 conn.close()
1564 p.join()
1565
1566 def test_sendbytes(self):
1567 if self.TYPE != 'processes':
1568 return
1569
1570 msg = latin('abcdefghijklmnopqrstuvwxyz')
1571 a, b = self.Pipe()
1572
1573 a.send_bytes(msg)
1574 self.assertEqual(b.recv_bytes(), msg)
1575
1576 a.send_bytes(msg, 5)
1577 self.assertEqual(b.recv_bytes(), msg[5:])
1578
1579 a.send_bytes(msg, 7, 8)
1580 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1581
1582 a.send_bytes(msg, 26)
1583 self.assertEqual(b.recv_bytes(), latin(''))
1584
1585 a.send_bytes(msg, 26, 0)
1586 self.assertEqual(b.recv_bytes(), latin(''))
1587
1588 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1589
1590 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1591
1592 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1593
1594 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1595
1596 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1597
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001598 @classmethod
1599 def _is_fd_assigned(cls, fd):
1600 try:
1601 os.fstat(fd)
1602 except OSError as e:
1603 if e.errno == errno.EBADF:
1604 return False
1605 raise
1606 else:
1607 return True
1608
1609 @classmethod
1610 def _writefd(cls, conn, data, create_dummy_fds=False):
1611 if create_dummy_fds:
1612 for i in range(0, 256):
1613 if not cls._is_fd_assigned(i):
1614 os.dup2(conn.fileno(), i)
1615 fd = reduction.recv_handle(conn)
1616 if msvcrt:
1617 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1618 os.write(fd, data)
1619 os.close(fd)
1620
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001621 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001622 def test_fd_transfer(self):
1623 if self.TYPE != 'processes':
1624 self.skipTest("only makes sense with processes")
1625 conn, child_conn = self.Pipe(duplex=True)
1626
1627 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001628 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001629 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001630 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001631 with open(test.support.TESTFN, "wb") as f:
1632 fd = f.fileno()
1633 if msvcrt:
1634 fd = msvcrt.get_osfhandle(fd)
1635 reduction.send_handle(conn, fd, p.pid)
1636 p.join()
1637 with open(test.support.TESTFN, "rb") as f:
1638 self.assertEqual(f.read(), b"foo")
1639
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001640 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001641 @unittest.skipIf(sys.platform == "win32",
1642 "test semantics don't make sense on Windows")
1643 @unittest.skipIf(MAXFD <= 256,
1644 "largest assignable fd number is too small")
1645 @unittest.skipUnless(hasattr(os, "dup2"),
1646 "test needs os.dup2()")
1647 def test_large_fd_transfer(self):
1648 # With fd > 256 (issue #11657)
1649 if self.TYPE != 'processes':
1650 self.skipTest("only makes sense with processes")
1651 conn, child_conn = self.Pipe(duplex=True)
1652
1653 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001654 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001655 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001656 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001657 with open(test.support.TESTFN, "wb") as f:
1658 fd = f.fileno()
1659 for newfd in range(256, MAXFD):
1660 if not self._is_fd_assigned(newfd):
1661 break
1662 else:
1663 self.fail("could not find an unassigned large file descriptor")
1664 os.dup2(fd, newfd)
1665 try:
1666 reduction.send_handle(conn, newfd, p.pid)
1667 finally:
1668 os.close(newfd)
1669 p.join()
1670 with open(test.support.TESTFN, "rb") as f:
1671 self.assertEqual(f.read(), b"bar")
1672
Jesus Cea4507e642011-09-21 03:53:25 +02001673 @classmethod
1674 def _send_data_without_fd(self, conn):
1675 os.write(conn.fileno(), b"\0")
1676
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001677 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001678 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1679 def test_missing_fd_transfer(self):
1680 # Check that exception is raised when received data is not
1681 # accompanied by a file descriptor in ancillary data.
1682 if self.TYPE != 'processes':
1683 self.skipTest("only makes sense with processes")
1684 conn, child_conn = self.Pipe(duplex=True)
1685
1686 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1687 p.daemon = True
1688 p.start()
1689 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1690 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001691
Benjamin Petersone711caf2008-06-11 16:44:04 +00001692class _TestListenerClient(BaseTestCase):
1693
1694 ALLOWED_TYPES = ('processes', 'threads')
1695
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001696 @classmethod
1697 def _test(cls, address):
1698 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001699 conn.send('hello')
1700 conn.close()
1701
1702 def test_listener_client(self):
1703 for family in self.connection.families:
1704 l = self.connection.Listener(family=family)
1705 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001706 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001707 p.start()
1708 conn = l.accept()
1709 self.assertEqual(conn.recv(), 'hello')
1710 p.join()
1711 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001712#
1713# Test of sending connection and socket objects between processes
1714#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001715"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001716class _TestPicklingConnections(BaseTestCase):
1717
1718 ALLOWED_TYPES = ('processes',)
1719
1720 def _listener(self, conn, families):
1721 for fam in families:
1722 l = self.connection.Listener(family=fam)
1723 conn.send(l.address)
1724 new_conn = l.accept()
1725 conn.send(new_conn)
1726
1727 if self.TYPE == 'processes':
1728 l = socket.socket()
1729 l.bind(('localhost', 0))
1730 conn.send(l.getsockname())
1731 l.listen(1)
1732 new_conn, addr = l.accept()
1733 conn.send(new_conn)
1734
1735 conn.recv()
1736
1737 def _remote(self, conn):
1738 for (address, msg) in iter(conn.recv, None):
1739 client = self.connection.Client(address)
1740 client.send(msg.upper())
1741 client.close()
1742
1743 if self.TYPE == 'processes':
1744 address, msg = conn.recv()
1745 client = socket.socket()
1746 client.connect(address)
1747 client.sendall(msg.upper())
1748 client.close()
1749
1750 conn.close()
1751
1752 def test_pickling(self):
1753 try:
1754 multiprocessing.allow_connection_pickling()
1755 except ImportError:
1756 return
1757
1758 families = self.connection.families
1759
1760 lconn, lconn0 = self.Pipe()
1761 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001762 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001763 lp.start()
1764 lconn0.close()
1765
1766 rconn, rconn0 = self.Pipe()
1767 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001768 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001769 rp.start()
1770 rconn0.close()
1771
1772 for fam in families:
1773 msg = ('This connection uses family %s' % fam).encode('ascii')
1774 address = lconn.recv()
1775 rconn.send((address, msg))
1776 new_conn = lconn.recv()
1777 self.assertEqual(new_conn.recv(), msg.upper())
1778
1779 rconn.send(None)
1780
1781 if self.TYPE == 'processes':
1782 msg = latin('This connection uses a normal socket')
1783 address = lconn.recv()
1784 rconn.send((address, msg))
1785 if hasattr(socket, 'fromfd'):
1786 new_conn = lconn.recv()
1787 self.assertEqual(new_conn.recv(100), msg.upper())
1788 else:
1789 # XXX On Windows with Py2.6 need to backport fromfd()
1790 discard = lconn.recv_bytes()
1791
1792 lconn.send(None)
1793
1794 rconn.close()
1795 lconn.close()
1796
1797 lp.join()
1798 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001799"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001800#
1801#
1802#
1803
1804class _TestHeap(BaseTestCase):
1805
1806 ALLOWED_TYPES = ('processes',)
1807
1808 def test_heap(self):
1809 iterations = 5000
1810 maxblocks = 50
1811 blocks = []
1812
1813 # create and destroy lots of blocks of different sizes
1814 for i in range(iterations):
1815 size = int(random.lognormvariate(0, 1) * 1000)
1816 b = multiprocessing.heap.BufferWrapper(size)
1817 blocks.append(b)
1818 if len(blocks) > maxblocks:
1819 i = random.randrange(maxblocks)
1820 del blocks[i]
1821
1822 # get the heap object
1823 heap = multiprocessing.heap.BufferWrapper._heap
1824
1825 # verify the state of the heap
1826 all = []
1827 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001828 heap._lock.acquire()
1829 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001830 for L in list(heap._len_to_seq.values()):
1831 for arena, start, stop in L:
1832 all.append((heap._arenas.index(arena), start, stop,
1833 stop-start, 'free'))
1834 for arena, start, stop in heap._allocated_blocks:
1835 all.append((heap._arenas.index(arena), start, stop,
1836 stop-start, 'occupied'))
1837 occupied += (stop-start)
1838
1839 all.sort()
1840
1841 for i in range(len(all)-1):
1842 (arena, start, stop) = all[i][:3]
1843 (narena, nstart, nstop) = all[i+1][:3]
1844 self.assertTrue((arena != narena and nstart == 0) or
1845 (stop == nstart))
1846
Charles-François Natali778db492011-07-02 14:35:49 +02001847 def test_free_from_gc(self):
1848 # Check that freeing of blocks by the garbage collector doesn't deadlock
1849 # (issue #12352).
1850 # Make sure the GC is enabled, and set lower collection thresholds to
1851 # make collections more frequent (and increase the probability of
1852 # deadlock).
1853 if not gc.isenabled():
1854 gc.enable()
1855 self.addCleanup(gc.disable)
1856 thresholds = gc.get_threshold()
1857 self.addCleanup(gc.set_threshold, *thresholds)
1858 gc.set_threshold(10)
1859
1860 # perform numerous block allocations, with cyclic references to make
1861 # sure objects are collected asynchronously by the gc
1862 for i in range(5000):
1863 a = multiprocessing.heap.BufferWrapper(1)
1864 b = multiprocessing.heap.BufferWrapper(1)
1865 # circular references
1866 a.buddy = b
1867 b.buddy = a
1868
Benjamin Petersone711caf2008-06-11 16:44:04 +00001869#
1870#
1871#
1872
Benjamin Petersone711caf2008-06-11 16:44:04 +00001873class _Foo(Structure):
1874 _fields_ = [
1875 ('x', c_int),
1876 ('y', c_double)
1877 ]
1878
1879class _TestSharedCTypes(BaseTestCase):
1880
1881 ALLOWED_TYPES = ('processes',)
1882
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001883 def setUp(self):
1884 if not HAS_SHAREDCTYPES:
1885 self.skipTest("requires multiprocessing.sharedctypes")
1886
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001887 @classmethod
1888 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001889 x.value *= 2
1890 y.value *= 2
1891 foo.x *= 2
1892 foo.y *= 2
1893 string.value *= 2
1894 for i in range(len(arr)):
1895 arr[i] *= 2
1896
1897 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001898 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001899 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001900 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001901 arr = self.Array('d', list(range(10)), lock=lock)
1902 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001903 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001904
1905 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001906 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001907 p.start()
1908 p.join()
1909
1910 self.assertEqual(x.value, 14)
1911 self.assertAlmostEqual(y.value, 2.0/3.0)
1912 self.assertEqual(foo.x, 6)
1913 self.assertAlmostEqual(foo.y, 4.0)
1914 for i in range(10):
1915 self.assertAlmostEqual(arr[i], i*2)
1916 self.assertEqual(string.value, latin('hellohello'))
1917
1918 def test_synchronize(self):
1919 self.test_sharedctypes(lock=True)
1920
1921 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001922 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001923 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001924 foo.x = 0
1925 foo.y = 0
1926 self.assertEqual(bar.x, 2)
1927 self.assertAlmostEqual(bar.y, 5.0)
1928
1929#
1930#
1931#
1932
1933class _TestFinalize(BaseTestCase):
1934
1935 ALLOWED_TYPES = ('processes',)
1936
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001937 @classmethod
1938 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001939 class Foo(object):
1940 pass
1941
1942 a = Foo()
1943 util.Finalize(a, conn.send, args=('a',))
1944 del a # triggers callback for a
1945
1946 b = Foo()
1947 close_b = util.Finalize(b, conn.send, args=('b',))
1948 close_b() # triggers callback for b
1949 close_b() # does nothing because callback has already been called
1950 del b # does nothing because callback has already been called
1951
1952 c = Foo()
1953 util.Finalize(c, conn.send, args=('c',))
1954
1955 d10 = Foo()
1956 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1957
1958 d01 = Foo()
1959 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1960 d02 = Foo()
1961 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1962 d03 = Foo()
1963 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1964
1965 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1966
1967 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1968
Ezio Melotti13925002011-03-16 11:05:33 +02001969 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001970 # garbage collecting locals
1971 util._exit_function()
1972 conn.close()
1973 os._exit(0)
1974
1975 def test_finalize(self):
1976 conn, child_conn = self.Pipe()
1977
1978 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001979 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001980 p.start()
1981 p.join()
1982
1983 result = [obj for obj in iter(conn.recv, 'STOP')]
1984 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1985
1986#
1987# Test that from ... import * works for each module
1988#
1989
1990class _TestImportStar(BaseTestCase):
1991
1992 ALLOWED_TYPES = ('processes',)
1993
1994 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001995 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001996 'multiprocessing', 'multiprocessing.connection',
1997 'multiprocessing.heap', 'multiprocessing.managers',
1998 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001999 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002000 ]
2001
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002002 if HAS_REDUCTION:
2003 modules.append('multiprocessing.reduction')
2004
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002005 if c_int is not None:
2006 # This module requires _ctypes
2007 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002008
2009 for name in modules:
2010 __import__(name)
2011 mod = sys.modules[name]
2012
2013 for attr in getattr(mod, '__all__', ()):
2014 self.assertTrue(
2015 hasattr(mod, attr),
2016 '%r does not have attribute %r' % (mod, attr)
2017 )
2018
2019#
2020# Quick test that logging works -- does not test logging output
2021#
2022
2023class _TestLogging(BaseTestCase):
2024
2025 ALLOWED_TYPES = ('processes',)
2026
2027 def test_enable_logging(self):
2028 logger = multiprocessing.get_logger()
2029 logger.setLevel(util.SUBWARNING)
2030 self.assertTrue(logger is not None)
2031 logger.debug('this will not be printed')
2032 logger.info('nor will this')
2033 logger.setLevel(LOG_LEVEL)
2034
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002035 @classmethod
2036 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002037 logger = multiprocessing.get_logger()
2038 conn.send(logger.getEffectiveLevel())
2039
2040 def test_level(self):
2041 LEVEL1 = 32
2042 LEVEL2 = 37
2043
2044 logger = multiprocessing.get_logger()
2045 root_logger = logging.getLogger()
2046 root_level = root_logger.level
2047
2048 reader, writer = multiprocessing.Pipe(duplex=False)
2049
2050 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002051 p = self.Process(target=self._test_level, args=(writer,))
2052 p.daemon = True
2053 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002054 self.assertEqual(LEVEL1, reader.recv())
2055
2056 logger.setLevel(logging.NOTSET)
2057 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002058 p = self.Process(target=self._test_level, args=(writer,))
2059 p.daemon = True
2060 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002061 self.assertEqual(LEVEL2, reader.recv())
2062
2063 root_logger.setLevel(root_level)
2064 logger.setLevel(level=LOG_LEVEL)
2065
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002066
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002067# class _TestLoggingProcessName(BaseTestCase):
2068#
2069# def handle(self, record):
2070# assert record.processName == multiprocessing.current_process().name
2071# self.__handled = True
2072#
2073# def test_logging(self):
2074# handler = logging.Handler()
2075# handler.handle = self.handle
2076# self.__handled = False
2077# # Bypass getLogger() and side-effects
2078# logger = logging.getLoggerClass()(
2079# 'multiprocessing.test.TestLoggingProcessName')
2080# logger.addHandler(handler)
2081# logger.propagate = False
2082#
2083# logger.warn('foo')
2084# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002085
Benjamin Petersone711caf2008-06-11 16:44:04 +00002086#
Jesse Noller6214edd2009-01-19 16:23:53 +00002087# Test to verify handle verification, see issue 3321
2088#
2089
2090class TestInvalidHandle(unittest.TestCase):
2091
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002092 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002093 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002094 conn = _multiprocessing.Connection(44977608)
2095 self.assertRaises(IOError, conn.poll)
2096 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002097
Jesse Noller6214edd2009-01-19 16:23:53 +00002098#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002099# Functions used to create test cases from the base ones in this module
2100#
2101
2102def get_attributes(Source, names):
2103 d = {}
2104 for name in names:
2105 obj = getattr(Source, name)
2106 if type(obj) == type(get_attributes):
2107 obj = staticmethod(obj)
2108 d[name] = obj
2109 return d
2110
2111def create_test_cases(Mixin, type):
2112 result = {}
2113 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002114 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002115
2116 for name in list(glob.keys()):
2117 if name.startswith('_Test'):
2118 base = glob[name]
2119 if type in base.ALLOWED_TYPES:
2120 newname = 'With' + Type + name[1:]
2121 class Temp(base, unittest.TestCase, Mixin):
2122 pass
2123 result[newname] = Temp
2124 Temp.__name__ = newname
2125 Temp.__module__ = Mixin.__module__
2126 return result
2127
2128#
2129# Create test cases
2130#
2131
2132class ProcessesMixin(object):
2133 TYPE = 'processes'
2134 Process = multiprocessing.Process
2135 locals().update(get_attributes(multiprocessing, (
2136 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2137 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2138 'RawArray', 'current_process', 'active_children', 'Pipe',
2139 'connection', 'JoinableQueue'
2140 )))
2141
2142testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2143globals().update(testcases_processes)
2144
2145
2146class ManagerMixin(object):
2147 TYPE = 'manager'
2148 Process = multiprocessing.Process
2149 manager = object.__new__(multiprocessing.managers.SyncManager)
2150 locals().update(get_attributes(manager, (
2151 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2152 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2153 'Namespace', 'JoinableQueue'
2154 )))
2155
2156testcases_manager = create_test_cases(ManagerMixin, type='manager')
2157globals().update(testcases_manager)
2158
2159
2160class ThreadsMixin(object):
2161 TYPE = 'threads'
2162 Process = multiprocessing.dummy.Process
2163 locals().update(get_attributes(multiprocessing.dummy, (
2164 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2165 'Condition', 'Event', 'Value', 'Array', 'current_process',
2166 'active_children', 'Pipe', 'connection', 'dict', 'list',
2167 'Namespace', 'JoinableQueue'
2168 )))
2169
2170testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2171globals().update(testcases_threads)
2172
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002173class OtherTest(unittest.TestCase):
2174 # TODO: add more tests for deliver/answer challenge.
2175 def test_deliver_challenge_auth_failure(self):
2176 class _FakeConnection(object):
2177 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002178 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002179 def send_bytes(self, data):
2180 pass
2181 self.assertRaises(multiprocessing.AuthenticationError,
2182 multiprocessing.connection.deliver_challenge,
2183 _FakeConnection(), b'abc')
2184
2185 def test_answer_challenge_auth_failure(self):
2186 class _FakeConnection(object):
2187 def __init__(self):
2188 self.count = 0
2189 def recv_bytes(self, size):
2190 self.count += 1
2191 if self.count == 1:
2192 return multiprocessing.connection.CHALLENGE
2193 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002194 return b'something bogus'
2195 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002196 def send_bytes(self, data):
2197 pass
2198 self.assertRaises(multiprocessing.AuthenticationError,
2199 multiprocessing.connection.answer_challenge,
2200 _FakeConnection(), b'abc')
2201
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002202#
2203# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2204#
2205
2206def initializer(ns):
2207 ns.test += 1
2208
2209class TestInitializers(unittest.TestCase):
2210 def setUp(self):
2211 self.mgr = multiprocessing.Manager()
2212 self.ns = self.mgr.Namespace()
2213 self.ns.test = 0
2214
2215 def tearDown(self):
2216 self.mgr.shutdown()
2217
2218 def test_manager_initializer(self):
2219 m = multiprocessing.managers.SyncManager()
2220 self.assertRaises(TypeError, m.start, 1)
2221 m.start(initializer, (self.ns,))
2222 self.assertEqual(self.ns.test, 1)
2223 m.shutdown()
2224
2225 def test_pool_initializer(self):
2226 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2227 p = multiprocessing.Pool(1, initializer, (self.ns,))
2228 p.close()
2229 p.join()
2230 self.assertEqual(self.ns.test, 1)
2231
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002232#
2233# Issue 5155, 5313, 5331: Test process in processes
2234# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2235#
2236
2237def _ThisSubProcess(q):
2238 try:
2239 item = q.get(block=False)
2240 except pyqueue.Empty:
2241 pass
2242
2243def _TestProcess(q):
2244 queue = multiprocessing.Queue()
2245 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002246 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002247 subProc.start()
2248 subProc.join()
2249
2250def _afunc(x):
2251 return x*x
2252
2253def pool_in_process():
2254 pool = multiprocessing.Pool(processes=4)
2255 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2256
2257class _file_like(object):
2258 def __init__(self, delegate):
2259 self._delegate = delegate
2260 self._pid = None
2261
2262 @property
2263 def cache(self):
2264 pid = os.getpid()
2265 # There are no race conditions since fork keeps only the running thread
2266 if pid != self._pid:
2267 self._pid = pid
2268 self._cache = []
2269 return self._cache
2270
2271 def write(self, data):
2272 self.cache.append(data)
2273
2274 def flush(self):
2275 self._delegate.write(''.join(self.cache))
2276 self._cache = []
2277
2278class TestStdinBadfiledescriptor(unittest.TestCase):
2279
2280 def test_queue_in_process(self):
2281 queue = multiprocessing.Queue()
2282 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2283 proc.start()
2284 proc.join()
2285
2286 def test_pool_in_process(self):
2287 p = multiprocessing.Process(target=pool_in_process)
2288 p.start()
2289 p.join()
2290
2291 def test_flushing(self):
2292 sio = io.StringIO()
2293 flike = _file_like(sio)
2294 flike.write('foo')
2295 proc = multiprocessing.Process(target=lambda: flike.flush())
2296 flike.flush()
2297 assert sio.getvalue() == 'foo'
2298
2299testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2300 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002301
Benjamin Petersone711caf2008-06-11 16:44:04 +00002302#
2303#
2304#
2305
2306def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002307 if sys.platform.startswith("linux"):
2308 try:
2309 lock = multiprocessing.RLock()
2310 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002311 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002312
Charles-François Natali3be00952011-11-22 18:36:39 +01002313 check_enough_semaphores()
2314
Benjamin Petersone711caf2008-06-11 16:44:04 +00002315 if run is None:
2316 from test.support import run_unittest as run
2317
2318 util.get_temp_dir() # creates temp directory for use by all processes
2319
2320 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2321
Benjamin Peterson41181742008-07-02 20:22:54 +00002322 ProcessesMixin.pool = multiprocessing.Pool(4)
2323 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2324 ManagerMixin.manager.__init__()
2325 ManagerMixin.manager.start()
2326 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002327
2328 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002329 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2330 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002331 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2332 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002333 )
2334
2335 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2336 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2337 run(suite)
2338
Benjamin Peterson41181742008-07-02 20:22:54 +00002339 ThreadsMixin.pool.terminate()
2340 ProcessesMixin.pool.terminate()
2341 ManagerMixin.pool.terminate()
2342 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002343
Benjamin Peterson41181742008-07-02 20:22:54 +00002344 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002345
2346def main():
2347 test_main(unittest.TextTestRunner(verbosity=2).run)
2348
2349if __name__ == '__main__':
2350 main()