blob: 7ac77e926d10af8f25446ee7e432a2e448e4c5f8 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalibc8f0822011-09-20 20:36:51 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020085if WIN32:
86 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
87
88 def wait_for_handle(handle, timeout):
89 if timeout is None or timeout < 0.0:
90 timeout = INFINITE
91 else:
92 timeout = int(1000 * timeout)
93 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
94else:
95 from select import select
96 _select = util._eintr_retry(select)
97
98 def wait_for_handle(handle, timeout):
99 if timeout is not None and timeout < 0.0:
100 timeout = None
101 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000102
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200103try:
104 MAXFD = os.sysconf("SC_OPEN_MAX")
105except:
106 MAXFD = 256
107
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000109# Some tests require ctypes
110#
111
112try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000113 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000114except ImportError:
115 Structure = object
116 c_int = c_double = None
117
118#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000119# Creates a wrapper for a function which records the time it takes to finish
120#
121
122class TimingWrapper(object):
123
124 def __init__(self, func):
125 self.func = func
126 self.elapsed = None
127
128 def __call__(self, *args, **kwds):
129 t = time.time()
130 try:
131 return self.func(*args, **kwds)
132 finally:
133 self.elapsed = time.time() - t
134
135#
136# Base class for test cases
137#
138
139class BaseTestCase(object):
140
141 ALLOWED_TYPES = ('processes', 'manager', 'threads')
142
143 def assertTimingAlmostEqual(self, a, b):
144 if CHECK_TIMINGS:
145 self.assertAlmostEqual(a, b, 1)
146
147 def assertReturnsIfImplemented(self, value, func, *args):
148 try:
149 res = func(*args)
150 except NotImplementedError:
151 pass
152 else:
153 return self.assertEqual(value, res)
154
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000155 # For the sanity of Windows users, rather than crashing or freezing in
156 # multiple ways.
157 def __reduce__(self, *args):
158 raise NotImplementedError("shouldn't try to pickle a test case")
159
160 __reduce_ex__ = __reduce__
161
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162#
163# Return the value of a semaphore
164#
165
166def get_value(self):
167 try:
168 return self.get_value()
169 except AttributeError:
170 try:
171 return self._Semaphore__value
172 except AttributeError:
173 try:
174 return self._value
175 except AttributeError:
176 raise NotImplementedError
177
178#
179# Testcases
180#
181
182class _TestProcess(BaseTestCase):
183
184 ALLOWED_TYPES = ('processes', 'threads')
185
186 def test_current(self):
187 if self.TYPE == 'threads':
188 return
189
190 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192
193 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000195 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 self.assertEqual(current.ident, os.getpid())
198 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000200 def test_daemon_argument(self):
201 if self.TYPE == "threads":
202 return
203
204 # By default uses the current process's daemon flag.
205 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000206 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000207 proc1 = self.Process(target=self._test, daemon=True)
208 self.assertTrue(proc1.daemon)
209 proc2 = self.Process(target=self._test, daemon=False)
210 self.assertFalse(proc2.daemon)
211
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000212 @classmethod
213 def _test(cls, q, *args, **kwds):
214 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215 q.put(args)
216 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000217 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000218 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000219 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220 q.put(current.pid)
221
222 def test_process(self):
223 q = self.Queue(1)
224 e = self.Event()
225 args = (q, 1, 2)
226 kwargs = {'hello':23, 'bye':2.54}
227 name = 'SomeProcess'
228 p = self.Process(
229 target=self._test, args=args, kwargs=kwargs, name=name
230 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000231 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 current = self.current_process()
233
234 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000235 self.assertEqual(p.authkey, current.authkey)
236 self.assertEqual(p.is_alive(), False)
237 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000238 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.start()
243
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.exitcode, None)
245 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
Ezio Melottib3aedd42010-11-20 19:04:17 +0000248 self.assertEqual(q.get(), args[1:])
249 self.assertEqual(q.get(), kwargs)
250 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(q.get(), current.authkey)
253 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
255 p.join()
256
Ezio Melottib3aedd42010-11-20 19:04:17 +0000257 self.assertEqual(p.exitcode, 0)
258 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000259 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000261 @classmethod
262 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263 time.sleep(1000)
264
265 def test_terminate(self):
266 if self.TYPE == 'threads':
267 return
268
269 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000270 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 p.start()
272
273 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
277 p.terminate()
278
279 join = TimingWrapper(p.join)
280 self.assertEqual(join(), None)
281 self.assertTimingAlmostEqual(join.elapsed, 0.0)
282
283 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285
286 p.join()
287
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000288 # XXX sometimes get p.exitcode == 0 on Windows ...
289 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000290
291 def test_cpu_count(self):
292 try:
293 cpus = multiprocessing.cpu_count()
294 except NotImplementedError:
295 cpus = 1
296 self.assertTrue(type(cpus) is int)
297 self.assertTrue(cpus >= 1)
298
299 def test_active_children(self):
300 self.assertEqual(type(self.active_children()), list)
301
302 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
Jesus Cea94f964f2011-09-09 20:26:57 +0200305 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000307 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000308
309 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000310 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000311
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000312 @classmethod
313 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000314 from multiprocessing import forking
315 wconn.send(id)
316 if len(id) < 2:
317 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000318 p = cls.Process(
319 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320 )
321 p.start()
322 p.join()
323
324 def test_recursion(self):
325 rconn, wconn = self.Pipe(duplex=False)
326 self._test_recursion(wconn, [])
327
328 time.sleep(DELTA)
329 result = []
330 while rconn.poll():
331 result.append(rconn.recv())
332
333 expected = [
334 [],
335 [0],
336 [0, 0],
337 [0, 1],
338 [1],
339 [1, 0],
340 [1, 1]
341 ]
342 self.assertEqual(result, expected)
343
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200344 @classmethod
345 def _test_sentinel(cls, event):
346 event.wait(10.0)
347
348 def test_sentinel(self):
349 if self.TYPE == "threads":
350 return
351 event = self.Event()
352 p = self.Process(target=self._test_sentinel, args=(event,))
353 with self.assertRaises(ValueError):
354 p.sentinel
355 p.start()
356 self.addCleanup(p.join)
357 sentinel = p.sentinel
358 self.assertIsInstance(sentinel, int)
359 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
360 event.set()
361 p.join()
362 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
363
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364#
365#
366#
367
368class _UpperCaser(multiprocessing.Process):
369
370 def __init__(self):
371 multiprocessing.Process.__init__(self)
372 self.child_conn, self.parent_conn = multiprocessing.Pipe()
373
374 def run(self):
375 self.parent_conn.close()
376 for s in iter(self.child_conn.recv, None):
377 self.child_conn.send(s.upper())
378 self.child_conn.close()
379
380 def submit(self, s):
381 assert type(s) is str
382 self.parent_conn.send(s)
383 return self.parent_conn.recv()
384
385 def stop(self):
386 self.parent_conn.send(None)
387 self.parent_conn.close()
388 self.child_conn.close()
389
390class _TestSubclassingProcess(BaseTestCase):
391
392 ALLOWED_TYPES = ('processes',)
393
394 def test_subclassing(self):
395 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200396 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000397 uppercaser.start()
398 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
399 self.assertEqual(uppercaser.submit('world'), 'WORLD')
400 uppercaser.stop()
401 uppercaser.join()
402
403#
404#
405#
406
407def queue_empty(q):
408 if hasattr(q, 'empty'):
409 return q.empty()
410 else:
411 return q.qsize() == 0
412
413def queue_full(q, maxsize):
414 if hasattr(q, 'full'):
415 return q.full()
416 else:
417 return q.qsize() == maxsize
418
419
420class _TestQueue(BaseTestCase):
421
422
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000423 @classmethod
424 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 child_can_start.wait()
426 for i in range(6):
427 queue.get()
428 parent_can_continue.set()
429
430 def test_put(self):
431 MAXSIZE = 6
432 queue = self.Queue(maxsize=MAXSIZE)
433 child_can_start = self.Event()
434 parent_can_continue = self.Event()
435
436 proc = self.Process(
437 target=self._test_put,
438 args=(queue, child_can_start, parent_can_continue)
439 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 proc.start()
442
443 self.assertEqual(queue_empty(queue), True)
444 self.assertEqual(queue_full(queue, MAXSIZE), False)
445
446 queue.put(1)
447 queue.put(2, True)
448 queue.put(3, True, None)
449 queue.put(4, False)
450 queue.put(5, False, None)
451 queue.put_nowait(6)
452
453 # the values may be in buffer but not yet in pipe so sleep a bit
454 time.sleep(DELTA)
455
456 self.assertEqual(queue_empty(queue), False)
457 self.assertEqual(queue_full(queue, MAXSIZE), True)
458
459 put = TimingWrapper(queue.put)
460 put_nowait = TimingWrapper(queue.put_nowait)
461
462 self.assertRaises(pyqueue.Full, put, 7, False)
463 self.assertTimingAlmostEqual(put.elapsed, 0)
464
465 self.assertRaises(pyqueue.Full, put, 7, False, None)
466 self.assertTimingAlmostEqual(put.elapsed, 0)
467
468 self.assertRaises(pyqueue.Full, put_nowait, 7)
469 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
470
471 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
473
474 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
475 self.assertTimingAlmostEqual(put.elapsed, 0)
476
477 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
478 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
479
480 child_can_start.set()
481 parent_can_continue.wait()
482
483 self.assertEqual(queue_empty(queue), True)
484 self.assertEqual(queue_full(queue, MAXSIZE), False)
485
486 proc.join()
487
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000488 @classmethod
489 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000490 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000491 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492 queue.put(2)
493 queue.put(3)
494 queue.put(4)
495 queue.put(5)
496 parent_can_continue.set()
497
498 def test_get(self):
499 queue = self.Queue()
500 child_can_start = self.Event()
501 parent_can_continue = self.Event()
502
503 proc = self.Process(
504 target=self._test_get,
505 args=(queue, child_can_start, parent_can_continue)
506 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000507 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000508 proc.start()
509
510 self.assertEqual(queue_empty(queue), True)
511
512 child_can_start.set()
513 parent_can_continue.wait()
514
515 time.sleep(DELTA)
516 self.assertEqual(queue_empty(queue), False)
517
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000518 # Hangs unexpectedly, remove for now
519 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000520 self.assertEqual(queue.get(True, None), 2)
521 self.assertEqual(queue.get(True), 3)
522 self.assertEqual(queue.get(timeout=1), 4)
523 self.assertEqual(queue.get_nowait(), 5)
524
525 self.assertEqual(queue_empty(queue), True)
526
527 get = TimingWrapper(queue.get)
528 get_nowait = TimingWrapper(queue.get_nowait)
529
530 self.assertRaises(pyqueue.Empty, get, False)
531 self.assertTimingAlmostEqual(get.elapsed, 0)
532
533 self.assertRaises(pyqueue.Empty, get, False, None)
534 self.assertTimingAlmostEqual(get.elapsed, 0)
535
536 self.assertRaises(pyqueue.Empty, get_nowait)
537 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
538
539 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
541
542 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
543 self.assertTimingAlmostEqual(get.elapsed, 0)
544
545 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
546 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
547
548 proc.join()
549
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000550 @classmethod
551 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000552 for i in range(10, 20):
553 queue.put(i)
554 # note that at this point the items may only be buffered, so the
555 # process cannot shutdown until the feeder thread has finished
556 # pushing items onto the pipe.
557
558 def test_fork(self):
559 # Old versions of Queue would fail to create a new feeder
560 # thread for a forked process if the original process had its
561 # own feeder thread. This test checks that this no longer
562 # happens.
563
564 queue = self.Queue()
565
566 # put items on queue so that main process starts a feeder thread
567 for i in range(10):
568 queue.put(i)
569
570 # wait to make sure thread starts before we fork a new process
571 time.sleep(DELTA)
572
573 # fork process
574 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200575 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 p.start()
577
578 # check that all expected items are in the queue
579 for i in range(20):
580 self.assertEqual(queue.get(), i)
581 self.assertRaises(pyqueue.Empty, queue.get, False)
582
583 p.join()
584
585 def test_qsize(self):
586 q = self.Queue()
587 try:
588 self.assertEqual(q.qsize(), 0)
589 except NotImplementedError:
590 return
591 q.put(1)
592 self.assertEqual(q.qsize(), 1)
593 q.put(5)
594 self.assertEqual(q.qsize(), 2)
595 q.get()
596 self.assertEqual(q.qsize(), 1)
597 q.get()
598 self.assertEqual(q.qsize(), 0)
599
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000600 @classmethod
601 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000602 for obj in iter(q.get, None):
603 time.sleep(DELTA)
604 q.task_done()
605
606 def test_task_done(self):
607 queue = self.JoinableQueue()
608
609 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000610 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000611
612 workers = [self.Process(target=self._test_task_done, args=(queue,))
613 for i in range(4)]
614
615 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200616 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000617 p.start()
618
619 for i in range(10):
620 queue.put(i)
621
622 queue.join()
623
624 for p in workers:
625 queue.put(None)
626
627 for p in workers:
628 p.join()
629
630#
631#
632#
633
634class _TestLock(BaseTestCase):
635
636 def test_lock(self):
637 lock = self.Lock()
638 self.assertEqual(lock.acquire(), True)
639 self.assertEqual(lock.acquire(False), False)
640 self.assertEqual(lock.release(), None)
641 self.assertRaises((ValueError, threading.ThreadError), lock.release)
642
643 def test_rlock(self):
644 lock = self.RLock()
645 self.assertEqual(lock.acquire(), True)
646 self.assertEqual(lock.acquire(), True)
647 self.assertEqual(lock.acquire(), True)
648 self.assertEqual(lock.release(), None)
649 self.assertEqual(lock.release(), None)
650 self.assertEqual(lock.release(), None)
651 self.assertRaises((AssertionError, RuntimeError), lock.release)
652
Jesse Nollerf8d00852009-03-31 03:25:07 +0000653 def test_lock_context(self):
654 with self.Lock():
655 pass
656
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657
658class _TestSemaphore(BaseTestCase):
659
660 def _test_semaphore(self, sem):
661 self.assertReturnsIfImplemented(2, get_value, sem)
662 self.assertEqual(sem.acquire(), True)
663 self.assertReturnsIfImplemented(1, get_value, sem)
664 self.assertEqual(sem.acquire(), True)
665 self.assertReturnsIfImplemented(0, get_value, sem)
666 self.assertEqual(sem.acquire(False), False)
667 self.assertReturnsIfImplemented(0, get_value, sem)
668 self.assertEqual(sem.release(), None)
669 self.assertReturnsIfImplemented(1, get_value, sem)
670 self.assertEqual(sem.release(), None)
671 self.assertReturnsIfImplemented(2, get_value, sem)
672
673 def test_semaphore(self):
674 sem = self.Semaphore(2)
675 self._test_semaphore(sem)
676 self.assertEqual(sem.release(), None)
677 self.assertReturnsIfImplemented(3, get_value, sem)
678 self.assertEqual(sem.release(), None)
679 self.assertReturnsIfImplemented(4, get_value, sem)
680
681 def test_bounded_semaphore(self):
682 sem = self.BoundedSemaphore(2)
683 self._test_semaphore(sem)
684 # Currently fails on OS/X
685 #if HAVE_GETVALUE:
686 # self.assertRaises(ValueError, sem.release)
687 # self.assertReturnsIfImplemented(2, get_value, sem)
688
689 def test_timeout(self):
690 if self.TYPE != 'processes':
691 return
692
693 sem = self.Semaphore(0)
694 acquire = TimingWrapper(sem.acquire)
695
696 self.assertEqual(acquire(False), False)
697 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
698
699 self.assertEqual(acquire(False, None), False)
700 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
701
702 self.assertEqual(acquire(False, TIMEOUT1), False)
703 self.assertTimingAlmostEqual(acquire.elapsed, 0)
704
705 self.assertEqual(acquire(True, TIMEOUT2), False)
706 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
707
708 self.assertEqual(acquire(timeout=TIMEOUT3), False)
709 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
710
711
712class _TestCondition(BaseTestCase):
713
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000714 @classmethod
715 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 cond.acquire()
717 sleeping.release()
718 cond.wait(timeout)
719 woken.release()
720 cond.release()
721
722 def check_invariant(self, cond):
723 # this is only supposed to succeed when there are no sleepers
724 if self.TYPE == 'processes':
725 try:
726 sleepers = (cond._sleeping_count.get_value() -
727 cond._woken_count.get_value())
728 self.assertEqual(sleepers, 0)
729 self.assertEqual(cond._wait_semaphore.get_value(), 0)
730 except NotImplementedError:
731 pass
732
733 def test_notify(self):
734 cond = self.Condition()
735 sleeping = self.Semaphore(0)
736 woken = self.Semaphore(0)
737
738 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 p.start()
741
742 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000743 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 p.start()
745
746 # wait for both children to start sleeping
747 sleeping.acquire()
748 sleeping.acquire()
749
750 # check no process/thread has woken up
751 time.sleep(DELTA)
752 self.assertReturnsIfImplemented(0, get_value, woken)
753
754 # wake up one process/thread
755 cond.acquire()
756 cond.notify()
757 cond.release()
758
759 # check one process/thread has woken up
760 time.sleep(DELTA)
761 self.assertReturnsIfImplemented(1, get_value, woken)
762
763 # wake up another
764 cond.acquire()
765 cond.notify()
766 cond.release()
767
768 # check other has woken up
769 time.sleep(DELTA)
770 self.assertReturnsIfImplemented(2, get_value, woken)
771
772 # check state is not mucked up
773 self.check_invariant(cond)
774 p.join()
775
776 def test_notify_all(self):
777 cond = self.Condition()
778 sleeping = self.Semaphore(0)
779 woken = self.Semaphore(0)
780
781 # start some threads/processes which will timeout
782 for i in range(3):
783 p = self.Process(target=self.f,
784 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000785 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 p.start()
787
788 t = threading.Thread(target=self.f,
789 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000790 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 t.start()
792
793 # wait for them all to sleep
794 for i in range(6):
795 sleeping.acquire()
796
797 # check they have all timed out
798 for i in range(6):
799 woken.acquire()
800 self.assertReturnsIfImplemented(0, get_value, woken)
801
802 # check state is not mucked up
803 self.check_invariant(cond)
804
805 # start some more threads/processes
806 for i in range(3):
807 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000808 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 p.start()
810
811 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000812 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813 t.start()
814
815 # wait for them to all sleep
816 for i in range(6):
817 sleeping.acquire()
818
819 # check no process/thread has woken up
820 time.sleep(DELTA)
821 self.assertReturnsIfImplemented(0, get_value, woken)
822
823 # wake them all up
824 cond.acquire()
825 cond.notify_all()
826 cond.release()
827
828 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200829 for i in range(10):
830 try:
831 if get_value(woken) == 6:
832 break
833 except NotImplementedError:
834 break
835 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 self.assertReturnsIfImplemented(6, get_value, woken)
837
838 # check state is not mucked up
839 self.check_invariant(cond)
840
841 def test_timeout(self):
842 cond = self.Condition()
843 wait = TimingWrapper(cond.wait)
844 cond.acquire()
845 res = wait(TIMEOUT1)
846 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000847 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000848 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
849
850
851class _TestEvent(BaseTestCase):
852
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000853 @classmethod
854 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855 time.sleep(TIMEOUT2)
856 event.set()
857
858 def test_event(self):
859 event = self.Event()
860 wait = TimingWrapper(event.wait)
861
Ezio Melotti13925002011-03-16 11:05:33 +0200862 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000864 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865
Benjamin Peterson965ce872009-04-05 21:24:58 +0000866 # Removed, threading.Event.wait() will return the value of the __flag
867 # instead of None. API Shear with the semaphore backed mp.Event
868 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000870 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
872
873 event.set()
874
875 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000876 self.assertEqual(event.is_set(), True)
877 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000879 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
881 # self.assertEqual(event.is_set(), True)
882
883 event.clear()
884
885 #self.assertEqual(event.is_set(), False)
886
Jesus Cea94f964f2011-09-09 20:26:57 +0200887 p = self.Process(target=self._test_event, args=(event,))
888 p.daemon = True
889 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000890 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891
892#
893#
894#
895
896class _TestValue(BaseTestCase):
897
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000898 ALLOWED_TYPES = ('processes',)
899
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 codes_values = [
901 ('i', 4343, 24234),
902 ('d', 3.625, -4.25),
903 ('h', -232, 234),
904 ('c', latin('x'), latin('y'))
905 ]
906
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000907 def setUp(self):
908 if not HAS_SHAREDCTYPES:
909 self.skipTest("requires multiprocessing.sharedctypes")
910
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000911 @classmethod
912 def _test(cls, values):
913 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000914 sv.value = cv[2]
915
916
917 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 if raw:
919 values = [self.RawValue(code, value)
920 for code, value, _ in self.codes_values]
921 else:
922 values = [self.Value(code, value)
923 for code, value, _ in self.codes_values]
924
925 for sv, cv in zip(values, self.codes_values):
926 self.assertEqual(sv.value, cv[1])
927
928 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200929 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930 proc.start()
931 proc.join()
932
933 for sv, cv in zip(values, self.codes_values):
934 self.assertEqual(sv.value, cv[2])
935
936 def test_rawvalue(self):
937 self.test_value(raw=True)
938
939 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 val1 = self.Value('i', 5)
941 lock1 = val1.get_lock()
942 obj1 = val1.get_obj()
943
944 val2 = self.Value('i', 5, lock=None)
945 lock2 = val2.get_lock()
946 obj2 = val2.get_obj()
947
948 lock = self.Lock()
949 val3 = self.Value('i', 5, lock=lock)
950 lock3 = val3.get_lock()
951 obj3 = val3.get_obj()
952 self.assertEqual(lock, lock3)
953
Jesse Nollerb0516a62009-01-18 03:11:38 +0000954 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000955 self.assertFalse(hasattr(arr4, 'get_lock'))
956 self.assertFalse(hasattr(arr4, 'get_obj'))
957
Jesse Nollerb0516a62009-01-18 03:11:38 +0000958 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
959
960 arr5 = self.RawValue('i', 5)
961 self.assertFalse(hasattr(arr5, 'get_lock'))
962 self.assertFalse(hasattr(arr5, 'get_obj'))
963
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964
965class _TestArray(BaseTestCase):
966
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000967 ALLOWED_TYPES = ('processes',)
968
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000969 @classmethod
970 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 for i in range(1, len(seq)):
972 seq[i] += seq[i-1]
973
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000974 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
977 if raw:
978 arr = self.RawArray('i', seq)
979 else:
980 arr = self.Array('i', seq)
981
982 self.assertEqual(len(arr), len(seq))
983 self.assertEqual(arr[3], seq[3])
984 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
985
986 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
987
988 self.assertEqual(list(arr[:]), seq)
989
990 self.f(seq)
991
992 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200993 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000994 p.start()
995 p.join()
996
997 self.assertEqual(list(arr[:]), seq)
998
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000999 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001000 def test_array_from_size(self):
1001 size = 10
1002 # Test for zeroing (see issue #11675).
1003 # The repetition below strengthens the test by increasing the chances
1004 # of previously allocated non-zero memory being used for the new array
1005 # on the 2nd and 3rd loops.
1006 for _ in range(3):
1007 arr = self.Array('i', size)
1008 self.assertEqual(len(arr), size)
1009 self.assertEqual(list(arr), [0] * size)
1010 arr[:] = range(10)
1011 self.assertEqual(list(arr), list(range(10)))
1012 del arr
1013
1014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 def test_rawarray(self):
1016 self.test_array(raw=True)
1017
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001018 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001019 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020 arr1 = self.Array('i', list(range(10)))
1021 lock1 = arr1.get_lock()
1022 obj1 = arr1.get_obj()
1023
1024 arr2 = self.Array('i', list(range(10)), lock=None)
1025 lock2 = arr2.get_lock()
1026 obj2 = arr2.get_obj()
1027
1028 lock = self.Lock()
1029 arr3 = self.Array('i', list(range(10)), lock=lock)
1030 lock3 = arr3.get_lock()
1031 obj3 = arr3.get_obj()
1032 self.assertEqual(lock, lock3)
1033
Jesse Nollerb0516a62009-01-18 03:11:38 +00001034 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035 self.assertFalse(hasattr(arr4, 'get_lock'))
1036 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001037 self.assertRaises(AttributeError,
1038 self.Array, 'i', range(10), lock='notalock')
1039
1040 arr5 = self.RawArray('i', range(10))
1041 self.assertFalse(hasattr(arr5, 'get_lock'))
1042 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001043
1044#
1045#
1046#
1047
1048class _TestContainers(BaseTestCase):
1049
1050 ALLOWED_TYPES = ('manager',)
1051
1052 def test_list(self):
1053 a = self.list(list(range(10)))
1054 self.assertEqual(a[:], list(range(10)))
1055
1056 b = self.list()
1057 self.assertEqual(b[:], [])
1058
1059 b.extend(list(range(5)))
1060 self.assertEqual(b[:], list(range(5)))
1061
1062 self.assertEqual(b[2], 2)
1063 self.assertEqual(b[2:10], [2,3,4])
1064
1065 b *= 2
1066 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1067
1068 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1069
1070 self.assertEqual(a[:], list(range(10)))
1071
1072 d = [a, b]
1073 e = self.list(d)
1074 self.assertEqual(
1075 e[:],
1076 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1077 )
1078
1079 f = self.list([a])
1080 a.append('hello')
1081 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1082
1083 def test_dict(self):
1084 d = self.dict()
1085 indices = list(range(65, 70))
1086 for i in indices:
1087 d[i] = chr(i)
1088 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1089 self.assertEqual(sorted(d.keys()), indices)
1090 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1091 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1092
1093 def test_namespace(self):
1094 n = self.Namespace()
1095 n.name = 'Bob'
1096 n.job = 'Builder'
1097 n._hidden = 'hidden'
1098 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1099 del n.job
1100 self.assertEqual(str(n), "Namespace(name='Bob')")
1101 self.assertTrue(hasattr(n, 'name'))
1102 self.assertTrue(not hasattr(n, 'job'))
1103
1104#
1105#
1106#
1107
1108def sqr(x, wait=0.0):
1109 time.sleep(wait)
1110 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001111
Benjamin Petersone711caf2008-06-11 16:44:04 +00001112class _TestPool(BaseTestCase):
1113
1114 def test_apply(self):
1115 papply = self.pool.apply
1116 self.assertEqual(papply(sqr, (5,)), sqr(5))
1117 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1118
1119 def test_map(self):
1120 pmap = self.pool.map
1121 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1122 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1123 list(map(sqr, list(range(100)))))
1124
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001125 def test_map_chunksize(self):
1126 try:
1127 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1128 except multiprocessing.TimeoutError:
1129 self.fail("pool.map_async with chunksize stalled on null list")
1130
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131 def test_async(self):
1132 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1133 get = TimingWrapper(res.get)
1134 self.assertEqual(get(), 49)
1135 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1136
1137 def test_async_timeout(self):
1138 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1139 get = TimingWrapper(res.get)
1140 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1141 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1142
1143 def test_imap(self):
1144 it = self.pool.imap(sqr, list(range(10)))
1145 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1146
1147 it = self.pool.imap(sqr, list(range(10)))
1148 for i in range(10):
1149 self.assertEqual(next(it), i*i)
1150 self.assertRaises(StopIteration, it.__next__)
1151
1152 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1153 for i in range(1000):
1154 self.assertEqual(next(it), i*i)
1155 self.assertRaises(StopIteration, it.__next__)
1156
1157 def test_imap_unordered(self):
1158 it = self.pool.imap_unordered(sqr, list(range(1000)))
1159 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1160
1161 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1162 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1163
1164 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001165 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1166 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1167
Benjamin Petersone711caf2008-06-11 16:44:04 +00001168 p = multiprocessing.Pool(3)
1169 self.assertEqual(3, len(p._pool))
1170 p.close()
1171 p.join()
1172
1173 def test_terminate(self):
1174 if self.TYPE == 'manager':
1175 # On Unix a forked process increfs each shared object to
1176 # which its parent process held a reference. If the
1177 # forked process gets terminated then there is likely to
1178 # be a reference leak. So to prevent
1179 # _TestZZZNumberOfObjects from failing we skip this test
1180 # when using a manager.
1181 return
1182
1183 result = self.pool.map_async(
1184 time.sleep, [0.1 for i in range(10000)], chunksize=1
1185 )
1186 self.pool.terminate()
1187 join = TimingWrapper(self.pool.join)
1188 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001189 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001190
Ask Solem2afcbf22010-11-09 20:55:52 +00001191def raising():
1192 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001193
Ask Solem2afcbf22010-11-09 20:55:52 +00001194def unpickleable_result():
1195 return lambda: 42
1196
1197class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001198 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001199
1200 def test_async_error_callback(self):
1201 p = multiprocessing.Pool(2)
1202
1203 scratchpad = [None]
1204 def errback(exc):
1205 scratchpad[0] = exc
1206
1207 res = p.apply_async(raising, error_callback=errback)
1208 self.assertRaises(KeyError, res.get)
1209 self.assertTrue(scratchpad[0])
1210 self.assertIsInstance(scratchpad[0], KeyError)
1211
1212 p.close()
1213 p.join()
1214
1215 def test_unpickleable_result(self):
1216 from multiprocessing.pool import MaybeEncodingError
1217 p = multiprocessing.Pool(2)
1218
1219 # Make sure we don't lose pool processes because of encoding errors.
1220 for iteration in range(20):
1221
1222 scratchpad = [None]
1223 def errback(exc):
1224 scratchpad[0] = exc
1225
1226 res = p.apply_async(unpickleable_result, error_callback=errback)
1227 self.assertRaises(MaybeEncodingError, res.get)
1228 wrapped = scratchpad[0]
1229 self.assertTrue(wrapped)
1230 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1231 self.assertIsNotNone(wrapped.exc)
1232 self.assertIsNotNone(wrapped.value)
1233
1234 p.close()
1235 p.join()
1236
1237class _TestPoolWorkerLifetime(BaseTestCase):
1238 ALLOWED_TYPES = ('processes', )
1239
Jesse Noller1f0b6582010-01-27 03:36:01 +00001240 def test_pool_worker_lifetime(self):
1241 p = multiprocessing.Pool(3, maxtasksperchild=10)
1242 self.assertEqual(3, len(p._pool))
1243 origworkerpids = [w.pid for w in p._pool]
1244 # Run many tasks so each worker gets replaced (hopefully)
1245 results = []
1246 for i in range(100):
1247 results.append(p.apply_async(sqr, (i, )))
1248 # Fetch the results and verify we got the right answers,
1249 # also ensuring all the tasks have completed.
1250 for (j, res) in enumerate(results):
1251 self.assertEqual(res.get(), sqr(j))
1252 # Refill the pool
1253 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001254 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001255 # (countdown * DELTA = 5 seconds max startup process time)
1256 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001257 while countdown and not all(w.is_alive() for w in p._pool):
1258 countdown -= 1
1259 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001260 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001261 # All pids should be assigned. See issue #7805.
1262 self.assertNotIn(None, origworkerpids)
1263 self.assertNotIn(None, finalworkerpids)
1264 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001265 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1266 p.close()
1267 p.join()
1268
Benjamin Petersone711caf2008-06-11 16:44:04 +00001269#
1270# Test that manager has expected number of shared objects left
1271#
1272
1273class _TestZZZNumberOfObjects(BaseTestCase):
1274 # Because test cases are sorted alphabetically, this one will get
1275 # run after all the other tests for the manager. It tests that
1276 # there have been no "reference leaks" for the manager's shared
1277 # objects. Note the comment in _TestPool.test_terminate().
1278 ALLOWED_TYPES = ('manager',)
1279
1280 def test_number_of_objects(self):
1281 EXPECTED_NUMBER = 1 # the pool object is still alive
1282 multiprocessing.active_children() # discard dead process objs
1283 gc.collect() # do garbage collection
1284 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001285 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001286 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001287 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001288 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001289
1290 self.assertEqual(refs, EXPECTED_NUMBER)
1291
1292#
1293# Test of creating a customized manager class
1294#
1295
1296from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1297
1298class FooBar(object):
1299 def f(self):
1300 return 'f()'
1301 def g(self):
1302 raise ValueError
1303 def _h(self):
1304 return '_h()'
1305
1306def baz():
1307 for i in range(10):
1308 yield i*i
1309
1310class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001311 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001312 def __iter__(self):
1313 return self
1314 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001315 return self._callmethod('__next__')
1316
1317class MyManager(BaseManager):
1318 pass
1319
1320MyManager.register('Foo', callable=FooBar)
1321MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1322MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1323
1324
1325class _TestMyManager(BaseTestCase):
1326
1327 ALLOWED_TYPES = ('manager',)
1328
1329 def test_mymanager(self):
1330 manager = MyManager()
1331 manager.start()
1332
1333 foo = manager.Foo()
1334 bar = manager.Bar()
1335 baz = manager.baz()
1336
1337 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1338 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1339
1340 self.assertEqual(foo_methods, ['f', 'g'])
1341 self.assertEqual(bar_methods, ['f', '_h'])
1342
1343 self.assertEqual(foo.f(), 'f()')
1344 self.assertRaises(ValueError, foo.g)
1345 self.assertEqual(foo._callmethod('f'), 'f()')
1346 self.assertRaises(RemoteError, foo._callmethod, '_h')
1347
1348 self.assertEqual(bar.f(), 'f()')
1349 self.assertEqual(bar._h(), '_h()')
1350 self.assertEqual(bar._callmethod('f'), 'f()')
1351 self.assertEqual(bar._callmethod('_h'), '_h()')
1352
1353 self.assertEqual(list(baz), [i*i for i in range(10)])
1354
1355 manager.shutdown()
1356
1357#
1358# Test of connecting to a remote server and using xmlrpclib for serialization
1359#
1360
1361_queue = pyqueue.Queue()
1362def get_queue():
1363 return _queue
1364
1365class QueueManager(BaseManager):
1366 '''manager class used by server process'''
1367QueueManager.register('get_queue', callable=get_queue)
1368
1369class QueueManager2(BaseManager):
1370 '''manager class which specifies the same interface as QueueManager'''
1371QueueManager2.register('get_queue')
1372
1373
1374SERIALIZER = 'xmlrpclib'
1375
1376class _TestRemoteManager(BaseTestCase):
1377
1378 ALLOWED_TYPES = ('manager',)
1379
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001380 @classmethod
1381 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001382 manager = QueueManager2(
1383 address=address, authkey=authkey, serializer=SERIALIZER
1384 )
1385 manager.connect()
1386 queue = manager.get_queue()
1387 queue.put(('hello world', None, True, 2.25))
1388
1389 def test_remote(self):
1390 authkey = os.urandom(32)
1391
1392 manager = QueueManager(
1393 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1394 )
1395 manager.start()
1396
1397 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001398 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001399 p.start()
1400
1401 manager2 = QueueManager2(
1402 address=manager.address, authkey=authkey, serializer=SERIALIZER
1403 )
1404 manager2.connect()
1405 queue = manager2.get_queue()
1406
1407 # Note that xmlrpclib will deserialize object as a list not a tuple
1408 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1409
1410 # Because we are using xmlrpclib for serialization instead of
1411 # pickle this will cause a serialization error.
1412 self.assertRaises(Exception, queue.put, time.sleep)
1413
1414 # Make queue finalizer run before the server is stopped
1415 del queue
1416 manager.shutdown()
1417
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001418class _TestManagerRestart(BaseTestCase):
1419
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001420 @classmethod
1421 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001422 manager = QueueManager(
1423 address=address, authkey=authkey, serializer=SERIALIZER)
1424 manager.connect()
1425 queue = manager.get_queue()
1426 queue.put('hello world')
1427
1428 def test_rapid_restart(self):
1429 authkey = os.urandom(32)
1430 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001431 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001432 srvr = manager.get_server()
1433 addr = srvr.address
1434 # Close the connection.Listener socket which gets opened as a part
1435 # of manager.get_server(). It's not needed for the test.
1436 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001437 manager.start()
1438
1439 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001440 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001441 p.start()
1442 queue = manager.get_queue()
1443 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001444 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001445 manager.shutdown()
1446 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001447 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001448 try:
1449 manager.start()
1450 except IOError as e:
1451 if e.errno != errno.EADDRINUSE:
1452 raise
1453 # Retry after some time, in case the old socket was lingering
1454 # (sporadic failure on buildbots)
1455 time.sleep(1.0)
1456 manager = QueueManager(
1457 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001458 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001459
Benjamin Petersone711caf2008-06-11 16:44:04 +00001460#
1461#
1462#
1463
1464SENTINEL = latin('')
1465
1466class _TestConnection(BaseTestCase):
1467
1468 ALLOWED_TYPES = ('processes', 'threads')
1469
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001470 @classmethod
1471 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001472 for msg in iter(conn.recv_bytes, SENTINEL):
1473 conn.send_bytes(msg)
1474 conn.close()
1475
1476 def test_connection(self):
1477 conn, child_conn = self.Pipe()
1478
1479 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001480 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001481 p.start()
1482
1483 seq = [1, 2.25, None]
1484 msg = latin('hello world')
1485 longmsg = msg * 10
1486 arr = array.array('i', list(range(4)))
1487
1488 if self.TYPE == 'processes':
1489 self.assertEqual(type(conn.fileno()), int)
1490
1491 self.assertEqual(conn.send(seq), None)
1492 self.assertEqual(conn.recv(), seq)
1493
1494 self.assertEqual(conn.send_bytes(msg), None)
1495 self.assertEqual(conn.recv_bytes(), msg)
1496
1497 if self.TYPE == 'processes':
1498 buffer = array.array('i', [0]*10)
1499 expected = list(arr) + [0] * (10 - len(arr))
1500 self.assertEqual(conn.send_bytes(arr), None)
1501 self.assertEqual(conn.recv_bytes_into(buffer),
1502 len(arr) * buffer.itemsize)
1503 self.assertEqual(list(buffer), expected)
1504
1505 buffer = array.array('i', [0]*10)
1506 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1507 self.assertEqual(conn.send_bytes(arr), None)
1508 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1509 len(arr) * buffer.itemsize)
1510 self.assertEqual(list(buffer), expected)
1511
1512 buffer = bytearray(latin(' ' * 40))
1513 self.assertEqual(conn.send_bytes(longmsg), None)
1514 try:
1515 res = conn.recv_bytes_into(buffer)
1516 except multiprocessing.BufferTooShort as e:
1517 self.assertEqual(e.args, (longmsg,))
1518 else:
1519 self.fail('expected BufferTooShort, got %s' % res)
1520
1521 poll = TimingWrapper(conn.poll)
1522
1523 self.assertEqual(poll(), False)
1524 self.assertTimingAlmostEqual(poll.elapsed, 0)
1525
1526 self.assertEqual(poll(TIMEOUT1), False)
1527 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1528
1529 conn.send(None)
1530
1531 self.assertEqual(poll(TIMEOUT1), True)
1532 self.assertTimingAlmostEqual(poll.elapsed, 0)
1533
1534 self.assertEqual(conn.recv(), None)
1535
1536 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1537 conn.send_bytes(really_big_msg)
1538 self.assertEqual(conn.recv_bytes(), really_big_msg)
1539
1540 conn.send_bytes(SENTINEL) # tell child to quit
1541 child_conn.close()
1542
1543 if self.TYPE == 'processes':
1544 self.assertEqual(conn.readable, True)
1545 self.assertEqual(conn.writable, True)
1546 self.assertRaises(EOFError, conn.recv)
1547 self.assertRaises(EOFError, conn.recv_bytes)
1548
1549 p.join()
1550
1551 def test_duplex_false(self):
1552 reader, writer = self.Pipe(duplex=False)
1553 self.assertEqual(writer.send(1), None)
1554 self.assertEqual(reader.recv(), 1)
1555 if self.TYPE == 'processes':
1556 self.assertEqual(reader.readable, True)
1557 self.assertEqual(reader.writable, False)
1558 self.assertEqual(writer.readable, False)
1559 self.assertEqual(writer.writable, True)
1560 self.assertRaises(IOError, reader.send, 2)
1561 self.assertRaises(IOError, writer.recv)
1562 self.assertRaises(IOError, writer.poll)
1563
1564 def test_spawn_close(self):
1565 # We test that a pipe connection can be closed by parent
1566 # process immediately after child is spawned. On Windows this
1567 # would have sometimes failed on old versions because
1568 # child_conn would be closed before the child got a chance to
1569 # duplicate it.
1570 conn, child_conn = self.Pipe()
1571
1572 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001573 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001574 p.start()
1575 child_conn.close() # this might complete before child initializes
1576
1577 msg = latin('hello')
1578 conn.send_bytes(msg)
1579 self.assertEqual(conn.recv_bytes(), msg)
1580
1581 conn.send_bytes(SENTINEL)
1582 conn.close()
1583 p.join()
1584
1585 def test_sendbytes(self):
1586 if self.TYPE != 'processes':
1587 return
1588
1589 msg = latin('abcdefghijklmnopqrstuvwxyz')
1590 a, b = self.Pipe()
1591
1592 a.send_bytes(msg)
1593 self.assertEqual(b.recv_bytes(), msg)
1594
1595 a.send_bytes(msg, 5)
1596 self.assertEqual(b.recv_bytes(), msg[5:])
1597
1598 a.send_bytes(msg, 7, 8)
1599 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1600
1601 a.send_bytes(msg, 26)
1602 self.assertEqual(b.recv_bytes(), latin(''))
1603
1604 a.send_bytes(msg, 26, 0)
1605 self.assertEqual(b.recv_bytes(), latin(''))
1606
1607 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1608
1609 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1610
1611 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1612
1613 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1614
1615 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1616
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001617 @classmethod
1618 def _is_fd_assigned(cls, fd):
1619 try:
1620 os.fstat(fd)
1621 except OSError as e:
1622 if e.errno == errno.EBADF:
1623 return False
1624 raise
1625 else:
1626 return True
1627
1628 @classmethod
1629 def _writefd(cls, conn, data, create_dummy_fds=False):
1630 if create_dummy_fds:
1631 for i in range(0, 256):
1632 if not cls._is_fd_assigned(i):
1633 os.dup2(conn.fileno(), i)
1634 fd = reduction.recv_handle(conn)
1635 if msvcrt:
1636 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1637 os.write(fd, data)
1638 os.close(fd)
1639
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001640 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001641 def test_fd_transfer(self):
1642 if self.TYPE != 'processes':
1643 self.skipTest("only makes sense with processes")
1644 conn, child_conn = self.Pipe(duplex=True)
1645
1646 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001647 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001648 p.start()
1649 with open(test.support.TESTFN, "wb") as f:
1650 fd = f.fileno()
1651 if msvcrt:
1652 fd = msvcrt.get_osfhandle(fd)
1653 reduction.send_handle(conn, fd, p.pid)
1654 p.join()
1655 with open(test.support.TESTFN, "rb") as f:
1656 self.assertEqual(f.read(), b"foo")
1657
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001658 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001659 @unittest.skipIf(sys.platform == "win32",
1660 "test semantics don't make sense on Windows")
1661 @unittest.skipIf(MAXFD <= 256,
1662 "largest assignable fd number is too small")
1663 @unittest.skipUnless(hasattr(os, "dup2"),
1664 "test needs os.dup2()")
1665 def test_large_fd_transfer(self):
1666 # With fd > 256 (issue #11657)
1667 if self.TYPE != 'processes':
1668 self.skipTest("only makes sense with processes")
1669 conn, child_conn = self.Pipe(duplex=True)
1670
1671 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001672 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001673 p.start()
1674 with open(test.support.TESTFN, "wb") as f:
1675 fd = f.fileno()
1676 for newfd in range(256, MAXFD):
1677 if not self._is_fd_assigned(newfd):
1678 break
1679 else:
1680 self.fail("could not find an unassigned large file descriptor")
1681 os.dup2(fd, newfd)
1682 try:
1683 reduction.send_handle(conn, newfd, p.pid)
1684 finally:
1685 os.close(newfd)
1686 p.join()
1687 with open(test.support.TESTFN, "rb") as f:
1688 self.assertEqual(f.read(), b"bar")
1689
1690
Benjamin Petersone711caf2008-06-11 16:44:04 +00001691class _TestListenerClient(BaseTestCase):
1692
1693 ALLOWED_TYPES = ('processes', 'threads')
1694
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001695 @classmethod
1696 def _test(cls, address):
1697 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001698 conn.send('hello')
1699 conn.close()
1700
1701 def test_listener_client(self):
1702 for family in self.connection.families:
1703 l = self.connection.Listener(family=family)
1704 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001705 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001706 p.start()
1707 conn = l.accept()
1708 self.assertEqual(conn.recv(), 'hello')
1709 p.join()
1710 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001711#
1712# Test of sending connection and socket objects between processes
1713#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001714"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001715class _TestPicklingConnections(BaseTestCase):
1716
1717 ALLOWED_TYPES = ('processes',)
1718
1719 def _listener(self, conn, families):
1720 for fam in families:
1721 l = self.connection.Listener(family=fam)
1722 conn.send(l.address)
1723 new_conn = l.accept()
1724 conn.send(new_conn)
1725
1726 if self.TYPE == 'processes':
1727 l = socket.socket()
1728 l.bind(('localhost', 0))
1729 conn.send(l.getsockname())
1730 l.listen(1)
1731 new_conn, addr = l.accept()
1732 conn.send(new_conn)
1733
1734 conn.recv()
1735
1736 def _remote(self, conn):
1737 for (address, msg) in iter(conn.recv, None):
1738 client = self.connection.Client(address)
1739 client.send(msg.upper())
1740 client.close()
1741
1742 if self.TYPE == 'processes':
1743 address, msg = conn.recv()
1744 client = socket.socket()
1745 client.connect(address)
1746 client.sendall(msg.upper())
1747 client.close()
1748
1749 conn.close()
1750
1751 def test_pickling(self):
1752 try:
1753 multiprocessing.allow_connection_pickling()
1754 except ImportError:
1755 return
1756
1757 families = self.connection.families
1758
1759 lconn, lconn0 = self.Pipe()
1760 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001761 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001762 lp.start()
1763 lconn0.close()
1764
1765 rconn, rconn0 = self.Pipe()
1766 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001767 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001768 rp.start()
1769 rconn0.close()
1770
1771 for fam in families:
1772 msg = ('This connection uses family %s' % fam).encode('ascii')
1773 address = lconn.recv()
1774 rconn.send((address, msg))
1775 new_conn = lconn.recv()
1776 self.assertEqual(new_conn.recv(), msg.upper())
1777
1778 rconn.send(None)
1779
1780 if self.TYPE == 'processes':
1781 msg = latin('This connection uses a normal socket')
1782 address = lconn.recv()
1783 rconn.send((address, msg))
1784 if hasattr(socket, 'fromfd'):
1785 new_conn = lconn.recv()
1786 self.assertEqual(new_conn.recv(100), msg.upper())
1787 else:
1788 # XXX On Windows with Py2.6 need to backport fromfd()
1789 discard = lconn.recv_bytes()
1790
1791 lconn.send(None)
1792
1793 rconn.close()
1794 lconn.close()
1795
1796 lp.join()
1797 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001798"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001799#
1800#
1801#
1802
1803class _TestHeap(BaseTestCase):
1804
1805 ALLOWED_TYPES = ('processes',)
1806
1807 def test_heap(self):
1808 iterations = 5000
1809 maxblocks = 50
1810 blocks = []
1811
1812 # create and destroy lots of blocks of different sizes
1813 for i in range(iterations):
1814 size = int(random.lognormvariate(0, 1) * 1000)
1815 b = multiprocessing.heap.BufferWrapper(size)
1816 blocks.append(b)
1817 if len(blocks) > maxblocks:
1818 i = random.randrange(maxblocks)
1819 del blocks[i]
1820
1821 # get the heap object
1822 heap = multiprocessing.heap.BufferWrapper._heap
1823
1824 # verify the state of the heap
1825 all = []
1826 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001827 heap._lock.acquire()
1828 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001829 for L in list(heap._len_to_seq.values()):
1830 for arena, start, stop in L:
1831 all.append((heap._arenas.index(arena), start, stop,
1832 stop-start, 'free'))
1833 for arena, start, stop in heap._allocated_blocks:
1834 all.append((heap._arenas.index(arena), start, stop,
1835 stop-start, 'occupied'))
1836 occupied += (stop-start)
1837
1838 all.sort()
1839
1840 for i in range(len(all)-1):
1841 (arena, start, stop) = all[i][:3]
1842 (narena, nstart, nstop) = all[i+1][:3]
1843 self.assertTrue((arena != narena and nstart == 0) or
1844 (stop == nstart))
1845
Charles-François Natali778db492011-07-02 14:35:49 +02001846 def test_free_from_gc(self):
1847 # Check that freeing of blocks by the garbage collector doesn't deadlock
1848 # (issue #12352).
1849 # Make sure the GC is enabled, and set lower collection thresholds to
1850 # make collections more frequent (and increase the probability of
1851 # deadlock).
1852 if not gc.isenabled():
1853 gc.enable()
1854 self.addCleanup(gc.disable)
1855 thresholds = gc.get_threshold()
1856 self.addCleanup(gc.set_threshold, *thresholds)
1857 gc.set_threshold(10)
1858
1859 # perform numerous block allocations, with cyclic references to make
1860 # sure objects are collected asynchronously by the gc
1861 for i in range(5000):
1862 a = multiprocessing.heap.BufferWrapper(1)
1863 b = multiprocessing.heap.BufferWrapper(1)
1864 # circular references
1865 a.buddy = b
1866 b.buddy = a
1867
Benjamin Petersone711caf2008-06-11 16:44:04 +00001868#
1869#
1870#
1871
Benjamin Petersone711caf2008-06-11 16:44:04 +00001872class _Foo(Structure):
1873 _fields_ = [
1874 ('x', c_int),
1875 ('y', c_double)
1876 ]
1877
1878class _TestSharedCTypes(BaseTestCase):
1879
1880 ALLOWED_TYPES = ('processes',)
1881
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001882 def setUp(self):
1883 if not HAS_SHAREDCTYPES:
1884 self.skipTest("requires multiprocessing.sharedctypes")
1885
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001886 @classmethod
1887 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001888 x.value *= 2
1889 y.value *= 2
1890 foo.x *= 2
1891 foo.y *= 2
1892 string.value *= 2
1893 for i in range(len(arr)):
1894 arr[i] *= 2
1895
1896 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001897 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001898 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001899 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001900 arr = self.Array('d', list(range(10)), lock=lock)
1901 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001902 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001903
1904 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001905 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001906 p.start()
1907 p.join()
1908
1909 self.assertEqual(x.value, 14)
1910 self.assertAlmostEqual(y.value, 2.0/3.0)
1911 self.assertEqual(foo.x, 6)
1912 self.assertAlmostEqual(foo.y, 4.0)
1913 for i in range(10):
1914 self.assertAlmostEqual(arr[i], i*2)
1915 self.assertEqual(string.value, latin('hellohello'))
1916
1917 def test_synchronize(self):
1918 self.test_sharedctypes(lock=True)
1919
1920 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001921 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001922 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001923 foo.x = 0
1924 foo.y = 0
1925 self.assertEqual(bar.x, 2)
1926 self.assertAlmostEqual(bar.y, 5.0)
1927
1928#
1929#
1930#
1931
1932class _TestFinalize(BaseTestCase):
1933
1934 ALLOWED_TYPES = ('processes',)
1935
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001936 @classmethod
1937 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001938 class Foo(object):
1939 pass
1940
1941 a = Foo()
1942 util.Finalize(a, conn.send, args=('a',))
1943 del a # triggers callback for a
1944
1945 b = Foo()
1946 close_b = util.Finalize(b, conn.send, args=('b',))
1947 close_b() # triggers callback for b
1948 close_b() # does nothing because callback has already been called
1949 del b # does nothing because callback has already been called
1950
1951 c = Foo()
1952 util.Finalize(c, conn.send, args=('c',))
1953
1954 d10 = Foo()
1955 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1956
1957 d01 = Foo()
1958 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1959 d02 = Foo()
1960 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1961 d03 = Foo()
1962 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1963
1964 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1965
1966 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1967
Ezio Melotti13925002011-03-16 11:05:33 +02001968 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001969 # garbage collecting locals
1970 util._exit_function()
1971 conn.close()
1972 os._exit(0)
1973
1974 def test_finalize(self):
1975 conn, child_conn = self.Pipe()
1976
1977 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001978 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001979 p.start()
1980 p.join()
1981
1982 result = [obj for obj in iter(conn.recv, 'STOP')]
1983 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1984
1985#
1986# Test that from ... import * works for each module
1987#
1988
1989class _TestImportStar(BaseTestCase):
1990
1991 ALLOWED_TYPES = ('processes',)
1992
1993 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001994 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001995 'multiprocessing', 'multiprocessing.connection',
1996 'multiprocessing.heap', 'multiprocessing.managers',
1997 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001998 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001999 ]
2000
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002001 if HAS_REDUCTION:
2002 modules.append('multiprocessing.reduction')
2003
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002004 if c_int is not None:
2005 # This module requires _ctypes
2006 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002007
2008 for name in modules:
2009 __import__(name)
2010 mod = sys.modules[name]
2011
2012 for attr in getattr(mod, '__all__', ()):
2013 self.assertTrue(
2014 hasattr(mod, attr),
2015 '%r does not have attribute %r' % (mod, attr)
2016 )
2017
2018#
2019# Quick test that logging works -- does not test logging output
2020#
2021
2022class _TestLogging(BaseTestCase):
2023
2024 ALLOWED_TYPES = ('processes',)
2025
2026 def test_enable_logging(self):
2027 logger = multiprocessing.get_logger()
2028 logger.setLevel(util.SUBWARNING)
2029 self.assertTrue(logger is not None)
2030 logger.debug('this will not be printed')
2031 logger.info('nor will this')
2032 logger.setLevel(LOG_LEVEL)
2033
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002034 @classmethod
2035 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002036 logger = multiprocessing.get_logger()
2037 conn.send(logger.getEffectiveLevel())
2038
2039 def test_level(self):
2040 LEVEL1 = 32
2041 LEVEL2 = 37
2042
2043 logger = multiprocessing.get_logger()
2044 root_logger = logging.getLogger()
2045 root_level = root_logger.level
2046
2047 reader, writer = multiprocessing.Pipe(duplex=False)
2048
2049 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002050 p = self.Process(target=self._test_level, args=(writer,))
2051 p.daemon = True
2052 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002053 self.assertEqual(LEVEL1, reader.recv())
2054
2055 logger.setLevel(logging.NOTSET)
2056 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002057 p = self.Process(target=self._test_level, args=(writer,))
2058 p.daemon = True
2059 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002060 self.assertEqual(LEVEL2, reader.recv())
2061
2062 root_logger.setLevel(root_level)
2063 logger.setLevel(level=LOG_LEVEL)
2064
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002065
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002066# class _TestLoggingProcessName(BaseTestCase):
2067#
2068# def handle(self, record):
2069# assert record.processName == multiprocessing.current_process().name
2070# self.__handled = True
2071#
2072# def test_logging(self):
2073# handler = logging.Handler()
2074# handler.handle = self.handle
2075# self.__handled = False
2076# # Bypass getLogger() and side-effects
2077# logger = logging.getLoggerClass()(
2078# 'multiprocessing.test.TestLoggingProcessName')
2079# logger.addHandler(handler)
2080# logger.propagate = False
2081#
2082# logger.warn('foo')
2083# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002084
Benjamin Petersone711caf2008-06-11 16:44:04 +00002085#
Jesse Noller6214edd2009-01-19 16:23:53 +00002086# Test to verify handle verification, see issue 3321
2087#
2088
2089class TestInvalidHandle(unittest.TestCase):
2090
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002091 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002092 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002093 conn = multiprocessing.connection.Connection(44977608)
2094 try:
2095 self.assertRaises((ValueError, IOError), conn.poll)
2096 finally:
2097 # Hack private attribute _handle to avoid printing an error
2098 # in conn.__del__
2099 conn._handle = None
2100 self.assertRaises((ValueError, IOError),
2101 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002102
Jesse Noller6214edd2009-01-19 16:23:53 +00002103#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002104# Functions used to create test cases from the base ones in this module
2105#
2106
2107def get_attributes(Source, names):
2108 d = {}
2109 for name in names:
2110 obj = getattr(Source, name)
2111 if type(obj) == type(get_attributes):
2112 obj = staticmethod(obj)
2113 d[name] = obj
2114 return d
2115
2116def create_test_cases(Mixin, type):
2117 result = {}
2118 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002119 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002120
2121 for name in list(glob.keys()):
2122 if name.startswith('_Test'):
2123 base = glob[name]
2124 if type in base.ALLOWED_TYPES:
2125 newname = 'With' + Type + name[1:]
2126 class Temp(base, unittest.TestCase, Mixin):
2127 pass
2128 result[newname] = Temp
2129 Temp.__name__ = newname
2130 Temp.__module__ = Mixin.__module__
2131 return result
2132
2133#
2134# Create test cases
2135#
2136
2137class ProcessesMixin(object):
2138 TYPE = 'processes'
2139 Process = multiprocessing.Process
2140 locals().update(get_attributes(multiprocessing, (
2141 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2142 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2143 'RawArray', 'current_process', 'active_children', 'Pipe',
2144 'connection', 'JoinableQueue'
2145 )))
2146
2147testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2148globals().update(testcases_processes)
2149
2150
2151class ManagerMixin(object):
2152 TYPE = 'manager'
2153 Process = multiprocessing.Process
2154 manager = object.__new__(multiprocessing.managers.SyncManager)
2155 locals().update(get_attributes(manager, (
2156 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2157 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2158 'Namespace', 'JoinableQueue'
2159 )))
2160
2161testcases_manager = create_test_cases(ManagerMixin, type='manager')
2162globals().update(testcases_manager)
2163
2164
2165class ThreadsMixin(object):
2166 TYPE = 'threads'
2167 Process = multiprocessing.dummy.Process
2168 locals().update(get_attributes(multiprocessing.dummy, (
2169 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2170 'Condition', 'Event', 'Value', 'Array', 'current_process',
2171 'active_children', 'Pipe', 'connection', 'dict', 'list',
2172 'Namespace', 'JoinableQueue'
2173 )))
2174
2175testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2176globals().update(testcases_threads)
2177
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002178class OtherTest(unittest.TestCase):
2179 # TODO: add more tests for deliver/answer challenge.
2180 def test_deliver_challenge_auth_failure(self):
2181 class _FakeConnection(object):
2182 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002183 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002184 def send_bytes(self, data):
2185 pass
2186 self.assertRaises(multiprocessing.AuthenticationError,
2187 multiprocessing.connection.deliver_challenge,
2188 _FakeConnection(), b'abc')
2189
2190 def test_answer_challenge_auth_failure(self):
2191 class _FakeConnection(object):
2192 def __init__(self):
2193 self.count = 0
2194 def recv_bytes(self, size):
2195 self.count += 1
2196 if self.count == 1:
2197 return multiprocessing.connection.CHALLENGE
2198 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002199 return b'something bogus'
2200 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002201 def send_bytes(self, data):
2202 pass
2203 self.assertRaises(multiprocessing.AuthenticationError,
2204 multiprocessing.connection.answer_challenge,
2205 _FakeConnection(), b'abc')
2206
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002207#
2208# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2209#
2210
2211def initializer(ns):
2212 ns.test += 1
2213
2214class TestInitializers(unittest.TestCase):
2215 def setUp(self):
2216 self.mgr = multiprocessing.Manager()
2217 self.ns = self.mgr.Namespace()
2218 self.ns.test = 0
2219
2220 def tearDown(self):
2221 self.mgr.shutdown()
2222
2223 def test_manager_initializer(self):
2224 m = multiprocessing.managers.SyncManager()
2225 self.assertRaises(TypeError, m.start, 1)
2226 m.start(initializer, (self.ns,))
2227 self.assertEqual(self.ns.test, 1)
2228 m.shutdown()
2229
2230 def test_pool_initializer(self):
2231 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2232 p = multiprocessing.Pool(1, initializer, (self.ns,))
2233 p.close()
2234 p.join()
2235 self.assertEqual(self.ns.test, 1)
2236
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002237#
2238# Issue 5155, 5313, 5331: Test process in processes
2239# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2240#
2241
2242def _ThisSubProcess(q):
2243 try:
2244 item = q.get(block=False)
2245 except pyqueue.Empty:
2246 pass
2247
2248def _TestProcess(q):
2249 queue = multiprocessing.Queue()
2250 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002251 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002252 subProc.start()
2253 subProc.join()
2254
2255def _afunc(x):
2256 return x*x
2257
2258def pool_in_process():
2259 pool = multiprocessing.Pool(processes=4)
2260 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2261
2262class _file_like(object):
2263 def __init__(self, delegate):
2264 self._delegate = delegate
2265 self._pid = None
2266
2267 @property
2268 def cache(self):
2269 pid = os.getpid()
2270 # There are no race conditions since fork keeps only the running thread
2271 if pid != self._pid:
2272 self._pid = pid
2273 self._cache = []
2274 return self._cache
2275
2276 def write(self, data):
2277 self.cache.append(data)
2278
2279 def flush(self):
2280 self._delegate.write(''.join(self.cache))
2281 self._cache = []
2282
2283class TestStdinBadfiledescriptor(unittest.TestCase):
2284
2285 def test_queue_in_process(self):
2286 queue = multiprocessing.Queue()
2287 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2288 proc.start()
2289 proc.join()
2290
2291 def test_pool_in_process(self):
2292 p = multiprocessing.Process(target=pool_in_process)
2293 p.start()
2294 p.join()
2295
2296 def test_flushing(self):
2297 sio = io.StringIO()
2298 flike = _file_like(sio)
2299 flike.write('foo')
2300 proc = multiprocessing.Process(target=lambda: flike.flush())
2301 flike.flush()
2302 assert sio.getvalue() == 'foo'
2303
2304testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2305 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002306
Benjamin Petersone711caf2008-06-11 16:44:04 +00002307#
2308#
2309#
2310
2311def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002312 if sys.platform.startswith("linux"):
2313 try:
2314 lock = multiprocessing.RLock()
2315 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002316 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002317
Benjamin Petersone711caf2008-06-11 16:44:04 +00002318 if run is None:
2319 from test.support import run_unittest as run
2320
2321 util.get_temp_dir() # creates temp directory for use by all processes
2322
2323 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2324
Benjamin Peterson41181742008-07-02 20:22:54 +00002325 ProcessesMixin.pool = multiprocessing.Pool(4)
2326 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2327 ManagerMixin.manager.__init__()
2328 ManagerMixin.manager.start()
2329 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002330
2331 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002332 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2333 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002334 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2335 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002336 )
2337
2338 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2339 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2340 run(suite)
2341
Benjamin Peterson41181742008-07-02 20:22:54 +00002342 ThreadsMixin.pool.terminate()
2343 ProcessesMixin.pool.terminate()
2344 ManagerMixin.pool.terminate()
2345 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002346
Benjamin Peterson41181742008-07-02 20:22:54 +00002347 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002348
2349def main():
2350 test_main(unittest.TextTestRunner(verbosity=2).run)
2351
2352if __name__ == '__main__':
2353 main()