blob: dca6b4241fba16b393a3657b91eaf6e941a7564a [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086if WIN32:
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020087 from _winapi import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
89 def wait_for_handle(handle, timeout):
90 if timeout is None or timeout < 0.0:
91 timeout = INFINITE
92 else:
93 timeout = int(1000 * timeout)
94 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
95else:
96 from select import select
97 _select = util._eintr_retry(select)
98
99 def wait_for_handle(handle, timeout):
100 if timeout is not None and timeout < 0.0:
101 timeout = None
102 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000103
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200104try:
105 MAXFD = os.sysconf("SC_OPEN_MAX")
106except:
107 MAXFD = 256
108
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000110# Some tests require ctypes
111#
112
113try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000114 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000115except ImportError:
116 Structure = object
117 c_int = c_double = None
118
Charles-François Natali221ef672011-11-22 18:55:22 +0100119
120def check_enough_semaphores():
121 """Check that the system supports enough semaphores to run the test."""
122 # minimum number of semaphores available according to POSIX
123 nsems_min = 256
124 try:
125 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
126 except (AttributeError, ValueError):
127 # sysconf not available or setting not available
128 return
129 if nsems == -1 or nsems >= nsems_min:
130 return
131 raise unittest.SkipTest("The OS doesn't support enough semaphores "
132 "to run the test (required: %d)." % nsems_min)
133
134
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000135#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136# Creates a wrapper for a function which records the time it takes to finish
137#
138
139class TimingWrapper(object):
140
141 def __init__(self, func):
142 self.func = func
143 self.elapsed = None
144
145 def __call__(self, *args, **kwds):
146 t = time.time()
147 try:
148 return self.func(*args, **kwds)
149 finally:
150 self.elapsed = time.time() - t
151
152#
153# Base class for test cases
154#
155
156class BaseTestCase(object):
157
158 ALLOWED_TYPES = ('processes', 'manager', 'threads')
159
160 def assertTimingAlmostEqual(self, a, b):
161 if CHECK_TIMINGS:
162 self.assertAlmostEqual(a, b, 1)
163
164 def assertReturnsIfImplemented(self, value, func, *args):
165 try:
166 res = func(*args)
167 except NotImplementedError:
168 pass
169 else:
170 return self.assertEqual(value, res)
171
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 # For the sanity of Windows users, rather than crashing or freezing in
173 # multiple ways.
174 def __reduce__(self, *args):
175 raise NotImplementedError("shouldn't try to pickle a test case")
176
177 __reduce_ex__ = __reduce__
178
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179#
180# Return the value of a semaphore
181#
182
183def get_value(self):
184 try:
185 return self.get_value()
186 except AttributeError:
187 try:
188 return self._Semaphore__value
189 except AttributeError:
190 try:
191 return self._value
192 except AttributeError:
193 raise NotImplementedError
194
195#
196# Testcases
197#
198
199class _TestProcess(BaseTestCase):
200
201 ALLOWED_TYPES = ('processes', 'threads')
202
203 def test_current(self):
204 if self.TYPE == 'threads':
205 return
206
207 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000212 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 self.assertEqual(current.ident, os.getpid())
215 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 def test_daemon_argument(self):
218 if self.TYPE == "threads":
219 return
220
221 # By default uses the current process's daemon flag.
222 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000223 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000224 proc1 = self.Process(target=self._test, daemon=True)
225 self.assertTrue(proc1.daemon)
226 proc2 = self.Process(target=self._test, daemon=False)
227 self.assertFalse(proc2.daemon)
228
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 @classmethod
230 def _test(cls, q, *args, **kwds):
231 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 q.put(args)
233 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000235 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000236 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 q.put(current.pid)
238
239 def test_process(self):
240 q = self.Queue(1)
241 e = self.Event()
242 args = (q, 1, 2)
243 kwargs = {'hello':23, 'bye':2.54}
244 name = 'SomeProcess'
245 p = self.Process(
246 target=self._test, args=args, kwargs=kwargs, name=name
247 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 current = self.current_process()
250
251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.authkey, current.authkey)
253 self.assertEqual(p.is_alive(), False)
254 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
260
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(p.exitcode, None)
262 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(q.get(), args[1:])
266 self.assertEqual(q.get(), kwargs)
267 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(q.get(), current.authkey)
270 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
272 p.join()
273
Ezio Melottib3aedd42010-11-20 19:04:17 +0000274 self.assertEqual(p.exitcode, 0)
275 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000276 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000278 @classmethod
279 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 time.sleep(1000)
281
282 def test_terminate(self):
283 if self.TYPE == 'threads':
284 return
285
286 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000287 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288 p.start()
289
290 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000291 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000292 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293
294 p.terminate()
295
296 join = TimingWrapper(p.join)
297 self.assertEqual(join(), None)
298 self.assertTimingAlmostEqual(join.elapsed, 0.0)
299
300 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
304
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000305 # XXX sometimes get p.exitcode == 0 on Windows ...
306 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307
308 def test_cpu_count(self):
309 try:
310 cpus = multiprocessing.cpu_count()
311 except NotImplementedError:
312 cpus = 1
313 self.assertTrue(type(cpus) is int)
314 self.assertTrue(cpus >= 1)
315
316 def test_active_children(self):
317 self.assertEqual(type(self.active_children()), list)
318
319 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000320 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
Jesus Cea94f964f2011-09-09 20:26:57 +0200322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000324 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000327 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000329 @classmethod
330 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 from multiprocessing import forking
332 wconn.send(id)
333 if len(id) < 2:
334 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000335 p = cls.Process(
336 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 )
338 p.start()
339 p.join()
340
341 def test_recursion(self):
342 rconn, wconn = self.Pipe(duplex=False)
343 self._test_recursion(wconn, [])
344
345 time.sleep(DELTA)
346 result = []
347 while rconn.poll():
348 result.append(rconn.recv())
349
350 expected = [
351 [],
352 [0],
353 [0, 0],
354 [0, 1],
355 [1],
356 [1, 0],
357 [1, 1]
358 ]
359 self.assertEqual(result, expected)
360
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200361 @classmethod
362 def _test_sentinel(cls, event):
363 event.wait(10.0)
364
365 def test_sentinel(self):
366 if self.TYPE == "threads":
367 return
368 event = self.Event()
369 p = self.Process(target=self._test_sentinel, args=(event,))
370 with self.assertRaises(ValueError):
371 p.sentinel
372 p.start()
373 self.addCleanup(p.join)
374 sentinel = p.sentinel
375 self.assertIsInstance(sentinel, int)
376 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
377 event.set()
378 p.join()
379 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
380
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381#
382#
383#
384
385class _UpperCaser(multiprocessing.Process):
386
387 def __init__(self):
388 multiprocessing.Process.__init__(self)
389 self.child_conn, self.parent_conn = multiprocessing.Pipe()
390
391 def run(self):
392 self.parent_conn.close()
393 for s in iter(self.child_conn.recv, None):
394 self.child_conn.send(s.upper())
395 self.child_conn.close()
396
397 def submit(self, s):
398 assert type(s) is str
399 self.parent_conn.send(s)
400 return self.parent_conn.recv()
401
402 def stop(self):
403 self.parent_conn.send(None)
404 self.parent_conn.close()
405 self.child_conn.close()
406
407class _TestSubclassingProcess(BaseTestCase):
408
409 ALLOWED_TYPES = ('processes',)
410
411 def test_subclassing(self):
412 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200413 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 uppercaser.start()
415 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
416 self.assertEqual(uppercaser.submit('world'), 'WORLD')
417 uppercaser.stop()
418 uppercaser.join()
419
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100420 def test_stderr_flush(self):
421 # sys.stderr is flushed at process shutdown (issue #13812)
422 if self.TYPE == "threads":
423 return
424
425 testfn = test.support.TESTFN
426 self.addCleanup(test.support.unlink, testfn)
427 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
428 proc.start()
429 proc.join()
430 with open(testfn, 'r') as f:
431 err = f.read()
432 # The whole traceback was printed
433 self.assertIn("ZeroDivisionError", err)
434 self.assertIn("test_multiprocessing.py", err)
435 self.assertIn("1/0 # MARKER", err)
436
437 @classmethod
438 def _test_stderr_flush(cls, testfn):
439 sys.stderr = open(testfn, 'w')
440 1/0 # MARKER
441
442
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443#
444#
445#
446
447def queue_empty(q):
448 if hasattr(q, 'empty'):
449 return q.empty()
450 else:
451 return q.qsize() == 0
452
453def queue_full(q, maxsize):
454 if hasattr(q, 'full'):
455 return q.full()
456 else:
457 return q.qsize() == maxsize
458
459
460class _TestQueue(BaseTestCase):
461
462
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000463 @classmethod
464 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000465 child_can_start.wait()
466 for i in range(6):
467 queue.get()
468 parent_can_continue.set()
469
470 def test_put(self):
471 MAXSIZE = 6
472 queue = self.Queue(maxsize=MAXSIZE)
473 child_can_start = self.Event()
474 parent_can_continue = self.Event()
475
476 proc = self.Process(
477 target=self._test_put,
478 args=(queue, child_can_start, parent_can_continue)
479 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000480 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000481 proc.start()
482
483 self.assertEqual(queue_empty(queue), True)
484 self.assertEqual(queue_full(queue, MAXSIZE), False)
485
486 queue.put(1)
487 queue.put(2, True)
488 queue.put(3, True, None)
489 queue.put(4, False)
490 queue.put(5, False, None)
491 queue.put_nowait(6)
492
493 # the values may be in buffer but not yet in pipe so sleep a bit
494 time.sleep(DELTA)
495
496 self.assertEqual(queue_empty(queue), False)
497 self.assertEqual(queue_full(queue, MAXSIZE), True)
498
499 put = TimingWrapper(queue.put)
500 put_nowait = TimingWrapper(queue.put_nowait)
501
502 self.assertRaises(pyqueue.Full, put, 7, False)
503 self.assertTimingAlmostEqual(put.elapsed, 0)
504
505 self.assertRaises(pyqueue.Full, put, 7, False, None)
506 self.assertTimingAlmostEqual(put.elapsed, 0)
507
508 self.assertRaises(pyqueue.Full, put_nowait, 7)
509 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
510
511 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
512 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
513
514 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
515 self.assertTimingAlmostEqual(put.elapsed, 0)
516
517 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
518 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
519
520 child_can_start.set()
521 parent_can_continue.wait()
522
523 self.assertEqual(queue_empty(queue), True)
524 self.assertEqual(queue_full(queue, MAXSIZE), False)
525
526 proc.join()
527
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000528 @classmethod
529 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000531 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 queue.put(2)
533 queue.put(3)
534 queue.put(4)
535 queue.put(5)
536 parent_can_continue.set()
537
538 def test_get(self):
539 queue = self.Queue()
540 child_can_start = self.Event()
541 parent_can_continue = self.Event()
542
543 proc = self.Process(
544 target=self._test_get,
545 args=(queue, child_can_start, parent_can_continue)
546 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000547 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000548 proc.start()
549
550 self.assertEqual(queue_empty(queue), True)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 time.sleep(DELTA)
556 self.assertEqual(queue_empty(queue), False)
557
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000558 # Hangs unexpectedly, remove for now
559 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 self.assertEqual(queue.get(True, None), 2)
561 self.assertEqual(queue.get(True), 3)
562 self.assertEqual(queue.get(timeout=1), 4)
563 self.assertEqual(queue.get_nowait(), 5)
564
565 self.assertEqual(queue_empty(queue), True)
566
567 get = TimingWrapper(queue.get)
568 get_nowait = TimingWrapper(queue.get_nowait)
569
570 self.assertRaises(pyqueue.Empty, get, False)
571 self.assertTimingAlmostEqual(get.elapsed, 0)
572
573 self.assertRaises(pyqueue.Empty, get, False, None)
574 self.assertTimingAlmostEqual(get.elapsed, 0)
575
576 self.assertRaises(pyqueue.Empty, get_nowait)
577 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
578
579 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
580 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
581
582 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
583 self.assertTimingAlmostEqual(get.elapsed, 0)
584
585 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
586 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
587
588 proc.join()
589
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000590 @classmethod
591 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 for i in range(10, 20):
593 queue.put(i)
594 # note that at this point the items may only be buffered, so the
595 # process cannot shutdown until the feeder thread has finished
596 # pushing items onto the pipe.
597
598 def test_fork(self):
599 # Old versions of Queue would fail to create a new feeder
600 # thread for a forked process if the original process had its
601 # own feeder thread. This test checks that this no longer
602 # happens.
603
604 queue = self.Queue()
605
606 # put items on queue so that main process starts a feeder thread
607 for i in range(10):
608 queue.put(i)
609
610 # wait to make sure thread starts before we fork a new process
611 time.sleep(DELTA)
612
613 # fork process
614 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200615 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000616 p.start()
617
618 # check that all expected items are in the queue
619 for i in range(20):
620 self.assertEqual(queue.get(), i)
621 self.assertRaises(pyqueue.Empty, queue.get, False)
622
623 p.join()
624
625 def test_qsize(self):
626 q = self.Queue()
627 try:
628 self.assertEqual(q.qsize(), 0)
629 except NotImplementedError:
630 return
631 q.put(1)
632 self.assertEqual(q.qsize(), 1)
633 q.put(5)
634 self.assertEqual(q.qsize(), 2)
635 q.get()
636 self.assertEqual(q.qsize(), 1)
637 q.get()
638 self.assertEqual(q.qsize(), 0)
639
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000640 @classmethod
641 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000642 for obj in iter(q.get, None):
643 time.sleep(DELTA)
644 q.task_done()
645
646 def test_task_done(self):
647 queue = self.JoinableQueue()
648
649 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000650 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651
652 workers = [self.Process(target=self._test_task_done, args=(queue,))
653 for i in range(4)]
654
655 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200656 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657 p.start()
658
659 for i in range(10):
660 queue.put(i)
661
662 queue.join()
663
664 for p in workers:
665 queue.put(None)
666
667 for p in workers:
668 p.join()
669
670#
671#
672#
673
674class _TestLock(BaseTestCase):
675
676 def test_lock(self):
677 lock = self.Lock()
678 self.assertEqual(lock.acquire(), True)
679 self.assertEqual(lock.acquire(False), False)
680 self.assertEqual(lock.release(), None)
681 self.assertRaises((ValueError, threading.ThreadError), lock.release)
682
683 def test_rlock(self):
684 lock = self.RLock()
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(), True)
687 self.assertEqual(lock.acquire(), True)
688 self.assertEqual(lock.release(), None)
689 self.assertEqual(lock.release(), None)
690 self.assertEqual(lock.release(), None)
691 self.assertRaises((AssertionError, RuntimeError), lock.release)
692
Jesse Nollerf8d00852009-03-31 03:25:07 +0000693 def test_lock_context(self):
694 with self.Lock():
695 pass
696
Benjamin Petersone711caf2008-06-11 16:44:04 +0000697
698class _TestSemaphore(BaseTestCase):
699
700 def _test_semaphore(self, sem):
701 self.assertReturnsIfImplemented(2, get_value, sem)
702 self.assertEqual(sem.acquire(), True)
703 self.assertReturnsIfImplemented(1, get_value, sem)
704 self.assertEqual(sem.acquire(), True)
705 self.assertReturnsIfImplemented(0, get_value, sem)
706 self.assertEqual(sem.acquire(False), False)
707 self.assertReturnsIfImplemented(0, get_value, sem)
708 self.assertEqual(sem.release(), None)
709 self.assertReturnsIfImplemented(1, get_value, sem)
710 self.assertEqual(sem.release(), None)
711 self.assertReturnsIfImplemented(2, get_value, sem)
712
713 def test_semaphore(self):
714 sem = self.Semaphore(2)
715 self._test_semaphore(sem)
716 self.assertEqual(sem.release(), None)
717 self.assertReturnsIfImplemented(3, get_value, sem)
718 self.assertEqual(sem.release(), None)
719 self.assertReturnsIfImplemented(4, get_value, sem)
720
721 def test_bounded_semaphore(self):
722 sem = self.BoundedSemaphore(2)
723 self._test_semaphore(sem)
724 # Currently fails on OS/X
725 #if HAVE_GETVALUE:
726 # self.assertRaises(ValueError, sem.release)
727 # self.assertReturnsIfImplemented(2, get_value, sem)
728
729 def test_timeout(self):
730 if self.TYPE != 'processes':
731 return
732
733 sem = self.Semaphore(0)
734 acquire = TimingWrapper(sem.acquire)
735
736 self.assertEqual(acquire(False), False)
737 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
738
739 self.assertEqual(acquire(False, None), False)
740 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
741
742 self.assertEqual(acquire(False, TIMEOUT1), False)
743 self.assertTimingAlmostEqual(acquire.elapsed, 0)
744
745 self.assertEqual(acquire(True, TIMEOUT2), False)
746 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
747
748 self.assertEqual(acquire(timeout=TIMEOUT3), False)
749 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
750
751
752class _TestCondition(BaseTestCase):
753
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000754 @classmethod
755 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 cond.acquire()
757 sleeping.release()
758 cond.wait(timeout)
759 woken.release()
760 cond.release()
761
762 def check_invariant(self, cond):
763 # this is only supposed to succeed when there are no sleepers
764 if self.TYPE == 'processes':
765 try:
766 sleepers = (cond._sleeping_count.get_value() -
767 cond._woken_count.get_value())
768 self.assertEqual(sleepers, 0)
769 self.assertEqual(cond._wait_semaphore.get_value(), 0)
770 except NotImplementedError:
771 pass
772
773 def test_notify(self):
774 cond = self.Condition()
775 sleeping = self.Semaphore(0)
776 woken = self.Semaphore(0)
777
778 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000779 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 p.start()
781
782 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000783 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 p.start()
785
786 # wait for both children to start sleeping
787 sleeping.acquire()
788 sleeping.acquire()
789
790 # check no process/thread has woken up
791 time.sleep(DELTA)
792 self.assertReturnsIfImplemented(0, get_value, woken)
793
794 # wake up one process/thread
795 cond.acquire()
796 cond.notify()
797 cond.release()
798
799 # check one process/thread has woken up
800 time.sleep(DELTA)
801 self.assertReturnsIfImplemented(1, get_value, woken)
802
803 # wake up another
804 cond.acquire()
805 cond.notify()
806 cond.release()
807
808 # check other has woken up
809 time.sleep(DELTA)
810 self.assertReturnsIfImplemented(2, get_value, woken)
811
812 # check state is not mucked up
813 self.check_invariant(cond)
814 p.join()
815
816 def test_notify_all(self):
817 cond = self.Condition()
818 sleeping = self.Semaphore(0)
819 woken = self.Semaphore(0)
820
821 # start some threads/processes which will timeout
822 for i in range(3):
823 p = self.Process(target=self.f,
824 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000825 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 p.start()
827
828 t = threading.Thread(target=self.f,
829 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000830 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 t.start()
832
833 # wait for them all to sleep
834 for i in range(6):
835 sleeping.acquire()
836
837 # check they have all timed out
838 for i in range(6):
839 woken.acquire()
840 self.assertReturnsIfImplemented(0, get_value, woken)
841
842 # check state is not mucked up
843 self.check_invariant(cond)
844
845 # start some more threads/processes
846 for i in range(3):
847 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000848 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 p.start()
850
851 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000852 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 t.start()
854
855 # wait for them to all sleep
856 for i in range(6):
857 sleeping.acquire()
858
859 # check no process/thread has woken up
860 time.sleep(DELTA)
861 self.assertReturnsIfImplemented(0, get_value, woken)
862
863 # wake them all up
864 cond.acquire()
865 cond.notify_all()
866 cond.release()
867
868 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200869 for i in range(10):
870 try:
871 if get_value(woken) == 6:
872 break
873 except NotImplementedError:
874 break
875 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 self.assertReturnsIfImplemented(6, get_value, woken)
877
878 # check state is not mucked up
879 self.check_invariant(cond)
880
881 def test_timeout(self):
882 cond = self.Condition()
883 wait = TimingWrapper(cond.wait)
884 cond.acquire()
885 res = wait(TIMEOUT1)
886 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000887 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
889
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200890 @classmethod
891 def _test_waitfor_f(cls, cond, state):
892 with cond:
893 state.value = 0
894 cond.notify()
895 result = cond.wait_for(lambda : state.value==4)
896 if not result or state.value != 4:
897 sys.exit(1)
898
899 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
900 def test_waitfor(self):
901 # based on test in test/lock_tests.py
902 cond = self.Condition()
903 state = self.Value('i', -1)
904
905 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
906 p.daemon = True
907 p.start()
908
909 with cond:
910 result = cond.wait_for(lambda : state.value==0)
911 self.assertTrue(result)
912 self.assertEqual(state.value, 0)
913
914 for i in range(4):
915 time.sleep(0.01)
916 with cond:
917 state.value += 1
918 cond.notify()
919
920 p.join(5)
921 self.assertFalse(p.is_alive())
922 self.assertEqual(p.exitcode, 0)
923
924 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100925 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
926 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200927 with cond:
928 expected = 0.1
929 dt = time.time()
930 result = cond.wait_for(lambda : state.value==4, timeout=expected)
931 dt = time.time() - dt
932 # borrow logic in assertTimeout() from test/lock_tests.py
933 if not result and expected * 0.6 < dt < expected * 10.0:
934 success.value = True
935
936 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
937 def test_waitfor_timeout(self):
938 # based on test in test/lock_tests.py
939 cond = self.Condition()
940 state = self.Value('i', 0)
941 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100942 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200943
944 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100945 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200946 p.daemon = True
947 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100948 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200949
950 # Only increment 3 times, so state == 4 is never reached.
951 for i in range(3):
952 time.sleep(0.01)
953 with cond:
954 state.value += 1
955 cond.notify()
956
957 p.join(5)
958 self.assertTrue(success.value)
959
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960
961class _TestEvent(BaseTestCase):
962
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000963 @classmethod
964 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000965 time.sleep(TIMEOUT2)
966 event.set()
967
968 def test_event(self):
969 event = self.Event()
970 wait = TimingWrapper(event.wait)
971
Ezio Melotti13925002011-03-16 11:05:33 +0200972 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000973 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000974 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975
Benjamin Peterson965ce872009-04-05 21:24:58 +0000976 # Removed, threading.Event.wait() will return the value of the __flag
977 # instead of None. API Shear with the semaphore backed mp.Event
978 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000979 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000980 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000981 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
982
983 event.set()
984
985 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000986 self.assertEqual(event.is_set(), True)
987 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000989 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000990 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
991 # self.assertEqual(event.is_set(), True)
992
993 event.clear()
994
995 #self.assertEqual(event.is_set(), False)
996
Jesus Cea94f964f2011-09-09 20:26:57 +0200997 p = self.Process(target=self._test_event, args=(event,))
998 p.daemon = True
999 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001000 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001001
1002#
1003#
1004#
1005
1006class _TestValue(BaseTestCase):
1007
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001008 ALLOWED_TYPES = ('processes',)
1009
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010 codes_values = [
1011 ('i', 4343, 24234),
1012 ('d', 3.625, -4.25),
1013 ('h', -232, 234),
1014 ('c', latin('x'), latin('y'))
1015 ]
1016
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001017 def setUp(self):
1018 if not HAS_SHAREDCTYPES:
1019 self.skipTest("requires multiprocessing.sharedctypes")
1020
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001021 @classmethod
1022 def _test(cls, values):
1023 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001024 sv.value = cv[2]
1025
1026
1027 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001028 if raw:
1029 values = [self.RawValue(code, value)
1030 for code, value, _ in self.codes_values]
1031 else:
1032 values = [self.Value(code, value)
1033 for code, value, _ in self.codes_values]
1034
1035 for sv, cv in zip(values, self.codes_values):
1036 self.assertEqual(sv.value, cv[1])
1037
1038 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001039 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001040 proc.start()
1041 proc.join()
1042
1043 for sv, cv in zip(values, self.codes_values):
1044 self.assertEqual(sv.value, cv[2])
1045
1046 def test_rawvalue(self):
1047 self.test_value(raw=True)
1048
1049 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 val1 = self.Value('i', 5)
1051 lock1 = val1.get_lock()
1052 obj1 = val1.get_obj()
1053
1054 val2 = self.Value('i', 5, lock=None)
1055 lock2 = val2.get_lock()
1056 obj2 = val2.get_obj()
1057
1058 lock = self.Lock()
1059 val3 = self.Value('i', 5, lock=lock)
1060 lock3 = val3.get_lock()
1061 obj3 = val3.get_obj()
1062 self.assertEqual(lock, lock3)
1063
Jesse Nollerb0516a62009-01-18 03:11:38 +00001064 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001065 self.assertFalse(hasattr(arr4, 'get_lock'))
1066 self.assertFalse(hasattr(arr4, 'get_obj'))
1067
Jesse Nollerb0516a62009-01-18 03:11:38 +00001068 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1069
1070 arr5 = self.RawValue('i', 5)
1071 self.assertFalse(hasattr(arr5, 'get_lock'))
1072 self.assertFalse(hasattr(arr5, 'get_obj'))
1073
Benjamin Petersone711caf2008-06-11 16:44:04 +00001074
1075class _TestArray(BaseTestCase):
1076
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001077 ALLOWED_TYPES = ('processes',)
1078
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001079 @classmethod
1080 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001081 for i in range(1, len(seq)):
1082 seq[i] += seq[i-1]
1083
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001084 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001086 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1087 if raw:
1088 arr = self.RawArray('i', seq)
1089 else:
1090 arr = self.Array('i', seq)
1091
1092 self.assertEqual(len(arr), len(seq))
1093 self.assertEqual(arr[3], seq[3])
1094 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1095
1096 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1097
1098 self.assertEqual(list(arr[:]), seq)
1099
1100 self.f(seq)
1101
1102 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001103 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001104 p.start()
1105 p.join()
1106
1107 self.assertEqual(list(arr[:]), seq)
1108
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001109 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001110 def test_array_from_size(self):
1111 size = 10
1112 # Test for zeroing (see issue #11675).
1113 # The repetition below strengthens the test by increasing the chances
1114 # of previously allocated non-zero memory being used for the new array
1115 # on the 2nd and 3rd loops.
1116 for _ in range(3):
1117 arr = self.Array('i', size)
1118 self.assertEqual(len(arr), size)
1119 self.assertEqual(list(arr), [0] * size)
1120 arr[:] = range(10)
1121 self.assertEqual(list(arr), list(range(10)))
1122 del arr
1123
1124 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001125 def test_rawarray(self):
1126 self.test_array(raw=True)
1127
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001128 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001129 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001130 arr1 = self.Array('i', list(range(10)))
1131 lock1 = arr1.get_lock()
1132 obj1 = arr1.get_obj()
1133
1134 arr2 = self.Array('i', list(range(10)), lock=None)
1135 lock2 = arr2.get_lock()
1136 obj2 = arr2.get_obj()
1137
1138 lock = self.Lock()
1139 arr3 = self.Array('i', list(range(10)), lock=lock)
1140 lock3 = arr3.get_lock()
1141 obj3 = arr3.get_obj()
1142 self.assertEqual(lock, lock3)
1143
Jesse Nollerb0516a62009-01-18 03:11:38 +00001144 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001145 self.assertFalse(hasattr(arr4, 'get_lock'))
1146 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001147 self.assertRaises(AttributeError,
1148 self.Array, 'i', range(10), lock='notalock')
1149
1150 arr5 = self.RawArray('i', range(10))
1151 self.assertFalse(hasattr(arr5, 'get_lock'))
1152 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001153
1154#
1155#
1156#
1157
1158class _TestContainers(BaseTestCase):
1159
1160 ALLOWED_TYPES = ('manager',)
1161
1162 def test_list(self):
1163 a = self.list(list(range(10)))
1164 self.assertEqual(a[:], list(range(10)))
1165
1166 b = self.list()
1167 self.assertEqual(b[:], [])
1168
1169 b.extend(list(range(5)))
1170 self.assertEqual(b[:], list(range(5)))
1171
1172 self.assertEqual(b[2], 2)
1173 self.assertEqual(b[2:10], [2,3,4])
1174
1175 b *= 2
1176 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1177
1178 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1179
1180 self.assertEqual(a[:], list(range(10)))
1181
1182 d = [a, b]
1183 e = self.list(d)
1184 self.assertEqual(
1185 e[:],
1186 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1187 )
1188
1189 f = self.list([a])
1190 a.append('hello')
1191 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1192
1193 def test_dict(self):
1194 d = self.dict()
1195 indices = list(range(65, 70))
1196 for i in indices:
1197 d[i] = chr(i)
1198 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1199 self.assertEqual(sorted(d.keys()), indices)
1200 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1201 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1202
1203 def test_namespace(self):
1204 n = self.Namespace()
1205 n.name = 'Bob'
1206 n.job = 'Builder'
1207 n._hidden = 'hidden'
1208 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1209 del n.job
1210 self.assertEqual(str(n), "Namespace(name='Bob')")
1211 self.assertTrue(hasattr(n, 'name'))
1212 self.assertTrue(not hasattr(n, 'job'))
1213
1214#
1215#
1216#
1217
1218def sqr(x, wait=0.0):
1219 time.sleep(wait)
1220 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001221
Antoine Pitroude911b22011-12-21 11:03:24 +01001222def mul(x, y):
1223 return x*y
1224
Benjamin Petersone711caf2008-06-11 16:44:04 +00001225class _TestPool(BaseTestCase):
1226
1227 def test_apply(self):
1228 papply = self.pool.apply
1229 self.assertEqual(papply(sqr, (5,)), sqr(5))
1230 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1231
1232 def test_map(self):
1233 pmap = self.pool.map
1234 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1235 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1236 list(map(sqr, list(range(100)))))
1237
Antoine Pitroude911b22011-12-21 11:03:24 +01001238 def test_starmap(self):
1239 psmap = self.pool.starmap
1240 tuples = list(zip(range(10), range(9,-1, -1)))
1241 self.assertEqual(psmap(mul, tuples),
1242 list(itertools.starmap(mul, tuples)))
1243 tuples = list(zip(range(100), range(99,-1, -1)))
1244 self.assertEqual(psmap(mul, tuples, chunksize=20),
1245 list(itertools.starmap(mul, tuples)))
1246
1247 def test_starmap_async(self):
1248 tuples = list(zip(range(100), range(99,-1, -1)))
1249 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1250 list(itertools.starmap(mul, tuples)))
1251
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001252 def test_map_chunksize(self):
1253 try:
1254 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1255 except multiprocessing.TimeoutError:
1256 self.fail("pool.map_async with chunksize stalled on null list")
1257
Benjamin Petersone711caf2008-06-11 16:44:04 +00001258 def test_async(self):
1259 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1260 get = TimingWrapper(res.get)
1261 self.assertEqual(get(), 49)
1262 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1263
1264 def test_async_timeout(self):
1265 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1266 get = TimingWrapper(res.get)
1267 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1268 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1269
1270 def test_imap(self):
1271 it = self.pool.imap(sqr, list(range(10)))
1272 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1273
1274 it = self.pool.imap(sqr, list(range(10)))
1275 for i in range(10):
1276 self.assertEqual(next(it), i*i)
1277 self.assertRaises(StopIteration, it.__next__)
1278
1279 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1280 for i in range(1000):
1281 self.assertEqual(next(it), i*i)
1282 self.assertRaises(StopIteration, it.__next__)
1283
1284 def test_imap_unordered(self):
1285 it = self.pool.imap_unordered(sqr, list(range(1000)))
1286 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1287
1288 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1289 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1290
1291 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001292 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1293 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1294
Benjamin Petersone711caf2008-06-11 16:44:04 +00001295 p = multiprocessing.Pool(3)
1296 self.assertEqual(3, len(p._pool))
1297 p.close()
1298 p.join()
1299
1300 def test_terminate(self):
1301 if self.TYPE == 'manager':
1302 # On Unix a forked process increfs each shared object to
1303 # which its parent process held a reference. If the
1304 # forked process gets terminated then there is likely to
1305 # be a reference leak. So to prevent
1306 # _TestZZZNumberOfObjects from failing we skip this test
1307 # when using a manager.
1308 return
1309
1310 result = self.pool.map_async(
1311 time.sleep, [0.1 for i in range(10000)], chunksize=1
1312 )
1313 self.pool.terminate()
1314 join = TimingWrapper(self.pool.join)
1315 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001316 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001317
Ask Solem2afcbf22010-11-09 20:55:52 +00001318def raising():
1319 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001320
Ask Solem2afcbf22010-11-09 20:55:52 +00001321def unpickleable_result():
1322 return lambda: 42
1323
1324class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001325 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001326
1327 def test_async_error_callback(self):
1328 p = multiprocessing.Pool(2)
1329
1330 scratchpad = [None]
1331 def errback(exc):
1332 scratchpad[0] = exc
1333
1334 res = p.apply_async(raising, error_callback=errback)
1335 self.assertRaises(KeyError, res.get)
1336 self.assertTrue(scratchpad[0])
1337 self.assertIsInstance(scratchpad[0], KeyError)
1338
1339 p.close()
1340 p.join()
1341
1342 def test_unpickleable_result(self):
1343 from multiprocessing.pool import MaybeEncodingError
1344 p = multiprocessing.Pool(2)
1345
1346 # Make sure we don't lose pool processes because of encoding errors.
1347 for iteration in range(20):
1348
1349 scratchpad = [None]
1350 def errback(exc):
1351 scratchpad[0] = exc
1352
1353 res = p.apply_async(unpickleable_result, error_callback=errback)
1354 self.assertRaises(MaybeEncodingError, res.get)
1355 wrapped = scratchpad[0]
1356 self.assertTrue(wrapped)
1357 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1358 self.assertIsNotNone(wrapped.exc)
1359 self.assertIsNotNone(wrapped.value)
1360
1361 p.close()
1362 p.join()
1363
1364class _TestPoolWorkerLifetime(BaseTestCase):
1365 ALLOWED_TYPES = ('processes', )
1366
Jesse Noller1f0b6582010-01-27 03:36:01 +00001367 def test_pool_worker_lifetime(self):
1368 p = multiprocessing.Pool(3, maxtasksperchild=10)
1369 self.assertEqual(3, len(p._pool))
1370 origworkerpids = [w.pid for w in p._pool]
1371 # Run many tasks so each worker gets replaced (hopefully)
1372 results = []
1373 for i in range(100):
1374 results.append(p.apply_async(sqr, (i, )))
1375 # Fetch the results and verify we got the right answers,
1376 # also ensuring all the tasks have completed.
1377 for (j, res) in enumerate(results):
1378 self.assertEqual(res.get(), sqr(j))
1379 # Refill the pool
1380 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001381 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001382 # (countdown * DELTA = 5 seconds max startup process time)
1383 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001384 while countdown and not all(w.is_alive() for w in p._pool):
1385 countdown -= 1
1386 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001387 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001388 # All pids should be assigned. See issue #7805.
1389 self.assertNotIn(None, origworkerpids)
1390 self.assertNotIn(None, finalworkerpids)
1391 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001392 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1393 p.close()
1394 p.join()
1395
Charles-François Natalif8859e12011-10-24 18:45:29 +02001396 def test_pool_worker_lifetime_early_close(self):
1397 # Issue #10332: closing a pool whose workers have limited lifetimes
1398 # before all the tasks completed would make join() hang.
1399 p = multiprocessing.Pool(3, maxtasksperchild=1)
1400 results = []
1401 for i in range(6):
1402 results.append(p.apply_async(sqr, (i, 0.3)))
1403 p.close()
1404 p.join()
1405 # check the results
1406 for (j, res) in enumerate(results):
1407 self.assertEqual(res.get(), sqr(j))
1408
1409
Benjamin Petersone711caf2008-06-11 16:44:04 +00001410#
1411# Test that manager has expected number of shared objects left
1412#
1413
1414class _TestZZZNumberOfObjects(BaseTestCase):
1415 # Because test cases are sorted alphabetically, this one will get
1416 # run after all the other tests for the manager. It tests that
1417 # there have been no "reference leaks" for the manager's shared
1418 # objects. Note the comment in _TestPool.test_terminate().
1419 ALLOWED_TYPES = ('manager',)
1420
1421 def test_number_of_objects(self):
1422 EXPECTED_NUMBER = 1 # the pool object is still alive
1423 multiprocessing.active_children() # discard dead process objs
1424 gc.collect() # do garbage collection
1425 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001426 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001427 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001428 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001429 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001430
1431 self.assertEqual(refs, EXPECTED_NUMBER)
1432
1433#
1434# Test of creating a customized manager class
1435#
1436
1437from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1438
1439class FooBar(object):
1440 def f(self):
1441 return 'f()'
1442 def g(self):
1443 raise ValueError
1444 def _h(self):
1445 return '_h()'
1446
1447def baz():
1448 for i in range(10):
1449 yield i*i
1450
1451class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001452 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 def __iter__(self):
1454 return self
1455 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001456 return self._callmethod('__next__')
1457
1458class MyManager(BaseManager):
1459 pass
1460
1461MyManager.register('Foo', callable=FooBar)
1462MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1463MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1464
1465
1466class _TestMyManager(BaseTestCase):
1467
1468 ALLOWED_TYPES = ('manager',)
1469
1470 def test_mymanager(self):
1471 manager = MyManager()
1472 manager.start()
1473
1474 foo = manager.Foo()
1475 bar = manager.Bar()
1476 baz = manager.baz()
1477
1478 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1479 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1480
1481 self.assertEqual(foo_methods, ['f', 'g'])
1482 self.assertEqual(bar_methods, ['f', '_h'])
1483
1484 self.assertEqual(foo.f(), 'f()')
1485 self.assertRaises(ValueError, foo.g)
1486 self.assertEqual(foo._callmethod('f'), 'f()')
1487 self.assertRaises(RemoteError, foo._callmethod, '_h')
1488
1489 self.assertEqual(bar.f(), 'f()')
1490 self.assertEqual(bar._h(), '_h()')
1491 self.assertEqual(bar._callmethod('f'), 'f()')
1492 self.assertEqual(bar._callmethod('_h'), '_h()')
1493
1494 self.assertEqual(list(baz), [i*i for i in range(10)])
1495
1496 manager.shutdown()
1497
1498#
1499# Test of connecting to a remote server and using xmlrpclib for serialization
1500#
1501
1502_queue = pyqueue.Queue()
1503def get_queue():
1504 return _queue
1505
1506class QueueManager(BaseManager):
1507 '''manager class used by server process'''
1508QueueManager.register('get_queue', callable=get_queue)
1509
1510class QueueManager2(BaseManager):
1511 '''manager class which specifies the same interface as QueueManager'''
1512QueueManager2.register('get_queue')
1513
1514
1515SERIALIZER = 'xmlrpclib'
1516
1517class _TestRemoteManager(BaseTestCase):
1518
1519 ALLOWED_TYPES = ('manager',)
1520
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001521 @classmethod
1522 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001523 manager = QueueManager2(
1524 address=address, authkey=authkey, serializer=SERIALIZER
1525 )
1526 manager.connect()
1527 queue = manager.get_queue()
1528 queue.put(('hello world', None, True, 2.25))
1529
1530 def test_remote(self):
1531 authkey = os.urandom(32)
1532
1533 manager = QueueManager(
1534 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1535 )
1536 manager.start()
1537
1538 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001539 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001540 p.start()
1541
1542 manager2 = QueueManager2(
1543 address=manager.address, authkey=authkey, serializer=SERIALIZER
1544 )
1545 manager2.connect()
1546 queue = manager2.get_queue()
1547
1548 # Note that xmlrpclib will deserialize object as a list not a tuple
1549 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1550
1551 # Because we are using xmlrpclib for serialization instead of
1552 # pickle this will cause a serialization error.
1553 self.assertRaises(Exception, queue.put, time.sleep)
1554
1555 # Make queue finalizer run before the server is stopped
1556 del queue
1557 manager.shutdown()
1558
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001559class _TestManagerRestart(BaseTestCase):
1560
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001561 @classmethod
1562 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001563 manager = QueueManager(
1564 address=address, authkey=authkey, serializer=SERIALIZER)
1565 manager.connect()
1566 queue = manager.get_queue()
1567 queue.put('hello world')
1568
1569 def test_rapid_restart(self):
1570 authkey = os.urandom(32)
1571 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001572 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001573 srvr = manager.get_server()
1574 addr = srvr.address
1575 # Close the connection.Listener socket which gets opened as a part
1576 # of manager.get_server(). It's not needed for the test.
1577 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001578 manager.start()
1579
1580 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001581 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001582 p.start()
1583 queue = manager.get_queue()
1584 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001585 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001586 manager.shutdown()
1587 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001588 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001589 try:
1590 manager.start()
1591 except IOError as e:
1592 if e.errno != errno.EADDRINUSE:
1593 raise
1594 # Retry after some time, in case the old socket was lingering
1595 # (sporadic failure on buildbots)
1596 time.sleep(1.0)
1597 manager = QueueManager(
1598 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001599 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001600
Benjamin Petersone711caf2008-06-11 16:44:04 +00001601#
1602#
1603#
1604
1605SENTINEL = latin('')
1606
1607class _TestConnection(BaseTestCase):
1608
1609 ALLOWED_TYPES = ('processes', 'threads')
1610
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001611 @classmethod
1612 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001613 for msg in iter(conn.recv_bytes, SENTINEL):
1614 conn.send_bytes(msg)
1615 conn.close()
1616
1617 def test_connection(self):
1618 conn, child_conn = self.Pipe()
1619
1620 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001621 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001622 p.start()
1623
1624 seq = [1, 2.25, None]
1625 msg = latin('hello world')
1626 longmsg = msg * 10
1627 arr = array.array('i', list(range(4)))
1628
1629 if self.TYPE == 'processes':
1630 self.assertEqual(type(conn.fileno()), int)
1631
1632 self.assertEqual(conn.send(seq), None)
1633 self.assertEqual(conn.recv(), seq)
1634
1635 self.assertEqual(conn.send_bytes(msg), None)
1636 self.assertEqual(conn.recv_bytes(), msg)
1637
1638 if self.TYPE == 'processes':
1639 buffer = array.array('i', [0]*10)
1640 expected = list(arr) + [0] * (10 - len(arr))
1641 self.assertEqual(conn.send_bytes(arr), None)
1642 self.assertEqual(conn.recv_bytes_into(buffer),
1643 len(arr) * buffer.itemsize)
1644 self.assertEqual(list(buffer), expected)
1645
1646 buffer = array.array('i', [0]*10)
1647 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1648 self.assertEqual(conn.send_bytes(arr), None)
1649 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1650 len(arr) * buffer.itemsize)
1651 self.assertEqual(list(buffer), expected)
1652
1653 buffer = bytearray(latin(' ' * 40))
1654 self.assertEqual(conn.send_bytes(longmsg), None)
1655 try:
1656 res = conn.recv_bytes_into(buffer)
1657 except multiprocessing.BufferTooShort as e:
1658 self.assertEqual(e.args, (longmsg,))
1659 else:
1660 self.fail('expected BufferTooShort, got %s' % res)
1661
1662 poll = TimingWrapper(conn.poll)
1663
1664 self.assertEqual(poll(), False)
1665 self.assertTimingAlmostEqual(poll.elapsed, 0)
1666
1667 self.assertEqual(poll(TIMEOUT1), False)
1668 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1669
1670 conn.send(None)
1671
1672 self.assertEqual(poll(TIMEOUT1), True)
1673 self.assertTimingAlmostEqual(poll.elapsed, 0)
1674
1675 self.assertEqual(conn.recv(), None)
1676
1677 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1678 conn.send_bytes(really_big_msg)
1679 self.assertEqual(conn.recv_bytes(), really_big_msg)
1680
1681 conn.send_bytes(SENTINEL) # tell child to quit
1682 child_conn.close()
1683
1684 if self.TYPE == 'processes':
1685 self.assertEqual(conn.readable, True)
1686 self.assertEqual(conn.writable, True)
1687 self.assertRaises(EOFError, conn.recv)
1688 self.assertRaises(EOFError, conn.recv_bytes)
1689
1690 p.join()
1691
1692 def test_duplex_false(self):
1693 reader, writer = self.Pipe(duplex=False)
1694 self.assertEqual(writer.send(1), None)
1695 self.assertEqual(reader.recv(), 1)
1696 if self.TYPE == 'processes':
1697 self.assertEqual(reader.readable, True)
1698 self.assertEqual(reader.writable, False)
1699 self.assertEqual(writer.readable, False)
1700 self.assertEqual(writer.writable, True)
1701 self.assertRaises(IOError, reader.send, 2)
1702 self.assertRaises(IOError, writer.recv)
1703 self.assertRaises(IOError, writer.poll)
1704
1705 def test_spawn_close(self):
1706 # We test that a pipe connection can be closed by parent
1707 # process immediately after child is spawned. On Windows this
1708 # would have sometimes failed on old versions because
1709 # child_conn would be closed before the child got a chance to
1710 # duplicate it.
1711 conn, child_conn = self.Pipe()
1712
1713 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001714 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001715 p.start()
1716 child_conn.close() # this might complete before child initializes
1717
1718 msg = latin('hello')
1719 conn.send_bytes(msg)
1720 self.assertEqual(conn.recv_bytes(), msg)
1721
1722 conn.send_bytes(SENTINEL)
1723 conn.close()
1724 p.join()
1725
1726 def test_sendbytes(self):
1727 if self.TYPE != 'processes':
1728 return
1729
1730 msg = latin('abcdefghijklmnopqrstuvwxyz')
1731 a, b = self.Pipe()
1732
1733 a.send_bytes(msg)
1734 self.assertEqual(b.recv_bytes(), msg)
1735
1736 a.send_bytes(msg, 5)
1737 self.assertEqual(b.recv_bytes(), msg[5:])
1738
1739 a.send_bytes(msg, 7, 8)
1740 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1741
1742 a.send_bytes(msg, 26)
1743 self.assertEqual(b.recv_bytes(), latin(''))
1744
1745 a.send_bytes(msg, 26, 0)
1746 self.assertEqual(b.recv_bytes(), latin(''))
1747
1748 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1749
1750 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1751
1752 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1753
1754 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1755
1756 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1757
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001758 @classmethod
1759 def _is_fd_assigned(cls, fd):
1760 try:
1761 os.fstat(fd)
1762 except OSError as e:
1763 if e.errno == errno.EBADF:
1764 return False
1765 raise
1766 else:
1767 return True
1768
1769 @classmethod
1770 def _writefd(cls, conn, data, create_dummy_fds=False):
1771 if create_dummy_fds:
1772 for i in range(0, 256):
1773 if not cls._is_fd_assigned(i):
1774 os.dup2(conn.fileno(), i)
1775 fd = reduction.recv_handle(conn)
1776 if msvcrt:
1777 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1778 os.write(fd, data)
1779 os.close(fd)
1780
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001781 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001782 def test_fd_transfer(self):
1783 if self.TYPE != 'processes':
1784 self.skipTest("only makes sense with processes")
1785 conn, child_conn = self.Pipe(duplex=True)
1786
1787 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001788 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001789 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001790 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001791 with open(test.support.TESTFN, "wb") as f:
1792 fd = f.fileno()
1793 if msvcrt:
1794 fd = msvcrt.get_osfhandle(fd)
1795 reduction.send_handle(conn, fd, p.pid)
1796 p.join()
1797 with open(test.support.TESTFN, "rb") as f:
1798 self.assertEqual(f.read(), b"foo")
1799
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001800 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001801 @unittest.skipIf(sys.platform == "win32",
1802 "test semantics don't make sense on Windows")
1803 @unittest.skipIf(MAXFD <= 256,
1804 "largest assignable fd number is too small")
1805 @unittest.skipUnless(hasattr(os, "dup2"),
1806 "test needs os.dup2()")
1807 def test_large_fd_transfer(self):
1808 # With fd > 256 (issue #11657)
1809 if self.TYPE != 'processes':
1810 self.skipTest("only makes sense with processes")
1811 conn, child_conn = self.Pipe(duplex=True)
1812
1813 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001814 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001815 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001816 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001817 with open(test.support.TESTFN, "wb") as f:
1818 fd = f.fileno()
1819 for newfd in range(256, MAXFD):
1820 if not self._is_fd_assigned(newfd):
1821 break
1822 else:
1823 self.fail("could not find an unassigned large file descriptor")
1824 os.dup2(fd, newfd)
1825 try:
1826 reduction.send_handle(conn, newfd, p.pid)
1827 finally:
1828 os.close(newfd)
1829 p.join()
1830 with open(test.support.TESTFN, "rb") as f:
1831 self.assertEqual(f.read(), b"bar")
1832
Jesus Cea4507e642011-09-21 03:53:25 +02001833 @classmethod
1834 def _send_data_without_fd(self, conn):
1835 os.write(conn.fileno(), b"\0")
1836
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001837 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001838 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1839 def test_missing_fd_transfer(self):
1840 # Check that exception is raised when received data is not
1841 # accompanied by a file descriptor in ancillary data.
1842 if self.TYPE != 'processes':
1843 self.skipTest("only makes sense with processes")
1844 conn, child_conn = self.Pipe(duplex=True)
1845
1846 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1847 p.daemon = True
1848 p.start()
1849 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1850 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001851
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001852class _TestListener(BaseTestCase):
1853
1854 ALLOWED_TYPES = ('processes')
1855
1856 def test_multiple_bind(self):
1857 for family in self.connection.families:
1858 l = self.connection.Listener(family=family)
1859 self.addCleanup(l.close)
1860 self.assertRaises(OSError, self.connection.Listener,
1861 l.address, family)
1862
Benjamin Petersone711caf2008-06-11 16:44:04 +00001863class _TestListenerClient(BaseTestCase):
1864
1865 ALLOWED_TYPES = ('processes', 'threads')
1866
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001867 @classmethod
1868 def _test(cls, address):
1869 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001870 conn.send('hello')
1871 conn.close()
1872
1873 def test_listener_client(self):
1874 for family in self.connection.families:
1875 l = self.connection.Listener(family=family)
1876 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001877 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001878 p.start()
1879 conn = l.accept()
1880 self.assertEqual(conn.recv(), 'hello')
1881 p.join()
1882 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001883
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01001884 def test_issue14725(self):
1885 l = self.connection.Listener()
1886 p = self.Process(target=self._test, args=(l.address,))
1887 p.daemon = True
1888 p.start()
1889 time.sleep(1)
1890 # On Windows the client process should by now have connected,
1891 # written data and closed the pipe handle by now. This causes
1892 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1893 # 14725.
1894 conn = l.accept()
1895 self.assertEqual(conn.recv(), 'hello')
1896 conn.close()
1897 p.join()
1898 l.close()
1899
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01001900class _TestPoll(unittest.TestCase):
1901
1902 ALLOWED_TYPES = ('processes', 'threads')
1903
1904 def test_empty_string(self):
1905 a, b = self.Pipe()
1906 self.assertEqual(a.poll(), False)
1907 b.send_bytes(b'')
1908 self.assertEqual(a.poll(), True)
1909 self.assertEqual(a.poll(), True)
1910
1911 @classmethod
1912 def _child_strings(cls, conn, strings):
1913 for s in strings:
1914 time.sleep(0.1)
1915 conn.send_bytes(s)
1916 conn.close()
1917
1918 def test_strings(self):
1919 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
1920 a, b = self.Pipe()
1921 p = self.Process(target=self._child_strings, args=(b, strings))
1922 p.start()
1923
1924 for s in strings:
1925 for i in range(200):
1926 if a.poll(0.01):
1927 break
1928 x = a.recv_bytes()
1929 self.assertEqual(s, x)
1930
1931 p.join()
1932
1933 @classmethod
1934 def _child_boundaries(cls, r):
1935 # Polling may "pull" a message in to the child process, but we
1936 # don't want it to pull only part of a message, as that would
1937 # corrupt the pipe for any other processes which might later
1938 # read from it.
1939 r.poll(5)
1940
1941 def test_boundaries(self):
1942 r, w = self.Pipe(False)
1943 p = self.Process(target=self._child_boundaries, args=(r,))
1944 p.start()
1945 time.sleep(2)
1946 L = [b"first", b"second"]
1947 for obj in L:
1948 w.send_bytes(obj)
1949 w.close()
1950 p.join()
1951 self.assertIn(r.recv_bytes(), L)
1952
1953 @classmethod
1954 def _child_dont_merge(cls, b):
1955 b.send_bytes(b'a')
1956 b.send_bytes(b'b')
1957 b.send_bytes(b'cd')
1958
1959 def test_dont_merge(self):
1960 a, b = self.Pipe()
1961 self.assertEqual(a.poll(0.0), False)
1962 self.assertEqual(a.poll(0.1), False)
1963
1964 p = self.Process(target=self._child_dont_merge, args=(b,))
1965 p.start()
1966
1967 self.assertEqual(a.recv_bytes(), b'a')
1968 self.assertEqual(a.poll(1.0), True)
1969 self.assertEqual(a.poll(1.0), True)
1970 self.assertEqual(a.recv_bytes(), b'b')
1971 self.assertEqual(a.poll(1.0), True)
1972 self.assertEqual(a.poll(1.0), True)
1973 self.assertEqual(a.poll(0.0), True)
1974 self.assertEqual(a.recv_bytes(), b'cd')
1975
1976 p.join()
1977
Benjamin Petersone711caf2008-06-11 16:44:04 +00001978#
1979# Test of sending connection and socket objects between processes
1980#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001981
Richard Oudkerk24524192012-04-30 14:48:51 +01001982# Intermittent fails on Mac OS X -- see Issue14669 and Issue12958
1983@unittest.skipIf(sys.platform == "darwin", "fd passing unreliable on Mac OS X")
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001984@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001985class _TestPicklingConnections(BaseTestCase):
1986
1987 ALLOWED_TYPES = ('processes',)
1988
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001989 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02001990 def tearDownClass(cls):
1991 from multiprocessing.reduction import resource_sharer
1992 resource_sharer.stop(timeout=5)
1993
1994 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001995 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001996 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001997 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001998 conn.send(l.address)
1999 new_conn = l.accept()
2000 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002001 new_conn.close()
2002 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002003
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002004 l = socket.socket()
2005 l.bind(('localhost', 0))
2006 conn.send(l.getsockname())
2007 l.listen(1)
2008 new_conn, addr = l.accept()
2009 conn.send(new_conn)
2010 new_conn.close()
2011 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002012
2013 conn.recv()
2014
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002015 @classmethod
2016 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002017 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002018 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002019 client.send(msg.upper())
2020 client.close()
2021
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002022 address, msg = conn.recv()
2023 client = socket.socket()
2024 client.connect(address)
2025 client.sendall(msg.upper())
2026 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002027
2028 conn.close()
2029
2030 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002031 families = self.connection.families
2032
2033 lconn, lconn0 = self.Pipe()
2034 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002035 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002036 lp.start()
2037 lconn0.close()
2038
2039 rconn, rconn0 = self.Pipe()
2040 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002041 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002042 rp.start()
2043 rconn0.close()
2044
2045 for fam in families:
2046 msg = ('This connection uses family %s' % fam).encode('ascii')
2047 address = lconn.recv()
2048 rconn.send((address, msg))
2049 new_conn = lconn.recv()
2050 self.assertEqual(new_conn.recv(), msg.upper())
2051
2052 rconn.send(None)
2053
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002054 msg = latin('This connection uses a normal socket')
2055 address = lconn.recv()
2056 rconn.send((address, msg))
2057 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002058 buf = []
2059 while True:
2060 s = new_conn.recv(100)
2061 if not s:
2062 break
2063 buf.append(s)
2064 buf = b''.join(buf)
2065 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002066 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002067
2068 lconn.send(None)
2069
2070 rconn.close()
2071 lconn.close()
2072
2073 lp.join()
2074 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002075
2076 @classmethod
2077 def child_access(cls, conn):
2078 w = conn.recv()
2079 w.send('all is well')
2080 w.close()
2081
2082 r = conn.recv()
2083 msg = r.recv()
2084 conn.send(msg*2)
2085
2086 conn.close()
2087
2088 def test_access(self):
2089 # On Windows, if we do not specify a destination pid when
2090 # using DupHandle then we need to be careful to use the
2091 # correct access flags for DuplicateHandle(), or else
2092 # DupHandle.detach() will raise PermissionError. For example,
2093 # for a read only pipe handle we should use
2094 # access=FILE_GENERIC_READ. (Unfortunately
2095 # DUPLICATE_SAME_ACCESS does not work.)
2096 conn, child_conn = self.Pipe()
2097 p = self.Process(target=self.child_access, args=(child_conn,))
2098 p.daemon = True
2099 p.start()
2100 child_conn.close()
2101
2102 r, w = self.Pipe(duplex=False)
2103 conn.send(w)
2104 w.close()
2105 self.assertEqual(r.recv(), 'all is well')
2106 r.close()
2107
2108 r, w = self.Pipe(duplex=False)
2109 conn.send(r)
2110 r.close()
2111 w.send('foobar')
2112 w.close()
2113 self.assertEqual(conn.recv(), 'foobar'*2)
2114
Benjamin Petersone711caf2008-06-11 16:44:04 +00002115#
2116#
2117#
2118
2119class _TestHeap(BaseTestCase):
2120
2121 ALLOWED_TYPES = ('processes',)
2122
2123 def test_heap(self):
2124 iterations = 5000
2125 maxblocks = 50
2126 blocks = []
2127
2128 # create and destroy lots of blocks of different sizes
2129 for i in range(iterations):
2130 size = int(random.lognormvariate(0, 1) * 1000)
2131 b = multiprocessing.heap.BufferWrapper(size)
2132 blocks.append(b)
2133 if len(blocks) > maxblocks:
2134 i = random.randrange(maxblocks)
2135 del blocks[i]
2136
2137 # get the heap object
2138 heap = multiprocessing.heap.BufferWrapper._heap
2139
2140 # verify the state of the heap
2141 all = []
2142 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002143 heap._lock.acquire()
2144 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002145 for L in list(heap._len_to_seq.values()):
2146 for arena, start, stop in L:
2147 all.append((heap._arenas.index(arena), start, stop,
2148 stop-start, 'free'))
2149 for arena, start, stop in heap._allocated_blocks:
2150 all.append((heap._arenas.index(arena), start, stop,
2151 stop-start, 'occupied'))
2152 occupied += (stop-start)
2153
2154 all.sort()
2155
2156 for i in range(len(all)-1):
2157 (arena, start, stop) = all[i][:3]
2158 (narena, nstart, nstop) = all[i+1][:3]
2159 self.assertTrue((arena != narena and nstart == 0) or
2160 (stop == nstart))
2161
Charles-François Natali778db492011-07-02 14:35:49 +02002162 def test_free_from_gc(self):
2163 # Check that freeing of blocks by the garbage collector doesn't deadlock
2164 # (issue #12352).
2165 # Make sure the GC is enabled, and set lower collection thresholds to
2166 # make collections more frequent (and increase the probability of
2167 # deadlock).
2168 if not gc.isenabled():
2169 gc.enable()
2170 self.addCleanup(gc.disable)
2171 thresholds = gc.get_threshold()
2172 self.addCleanup(gc.set_threshold, *thresholds)
2173 gc.set_threshold(10)
2174
2175 # perform numerous block allocations, with cyclic references to make
2176 # sure objects are collected asynchronously by the gc
2177 for i in range(5000):
2178 a = multiprocessing.heap.BufferWrapper(1)
2179 b = multiprocessing.heap.BufferWrapper(1)
2180 # circular references
2181 a.buddy = b
2182 b.buddy = a
2183
Benjamin Petersone711caf2008-06-11 16:44:04 +00002184#
2185#
2186#
2187
Benjamin Petersone711caf2008-06-11 16:44:04 +00002188class _Foo(Structure):
2189 _fields_ = [
2190 ('x', c_int),
2191 ('y', c_double)
2192 ]
2193
2194class _TestSharedCTypes(BaseTestCase):
2195
2196 ALLOWED_TYPES = ('processes',)
2197
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002198 def setUp(self):
2199 if not HAS_SHAREDCTYPES:
2200 self.skipTest("requires multiprocessing.sharedctypes")
2201
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002202 @classmethod
2203 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002204 x.value *= 2
2205 y.value *= 2
2206 foo.x *= 2
2207 foo.y *= 2
2208 string.value *= 2
2209 for i in range(len(arr)):
2210 arr[i] *= 2
2211
2212 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002213 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002214 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002215 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002216 arr = self.Array('d', list(range(10)), lock=lock)
2217 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002218 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002219
2220 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002221 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002222 p.start()
2223 p.join()
2224
2225 self.assertEqual(x.value, 14)
2226 self.assertAlmostEqual(y.value, 2.0/3.0)
2227 self.assertEqual(foo.x, 6)
2228 self.assertAlmostEqual(foo.y, 4.0)
2229 for i in range(10):
2230 self.assertAlmostEqual(arr[i], i*2)
2231 self.assertEqual(string.value, latin('hellohello'))
2232
2233 def test_synchronize(self):
2234 self.test_sharedctypes(lock=True)
2235
2236 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002237 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002238 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002239 foo.x = 0
2240 foo.y = 0
2241 self.assertEqual(bar.x, 2)
2242 self.assertAlmostEqual(bar.y, 5.0)
2243
2244#
2245#
2246#
2247
2248class _TestFinalize(BaseTestCase):
2249
2250 ALLOWED_TYPES = ('processes',)
2251
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002252 @classmethod
2253 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002254 class Foo(object):
2255 pass
2256
2257 a = Foo()
2258 util.Finalize(a, conn.send, args=('a',))
2259 del a # triggers callback for a
2260
2261 b = Foo()
2262 close_b = util.Finalize(b, conn.send, args=('b',))
2263 close_b() # triggers callback for b
2264 close_b() # does nothing because callback has already been called
2265 del b # does nothing because callback has already been called
2266
2267 c = Foo()
2268 util.Finalize(c, conn.send, args=('c',))
2269
2270 d10 = Foo()
2271 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2272
2273 d01 = Foo()
2274 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2275 d02 = Foo()
2276 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2277 d03 = Foo()
2278 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2279
2280 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2281
2282 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2283
Ezio Melotti13925002011-03-16 11:05:33 +02002284 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002285 # garbage collecting locals
2286 util._exit_function()
2287 conn.close()
2288 os._exit(0)
2289
2290 def test_finalize(self):
2291 conn, child_conn = self.Pipe()
2292
2293 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002294 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002295 p.start()
2296 p.join()
2297
2298 result = [obj for obj in iter(conn.recv, 'STOP')]
2299 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2300
2301#
2302# Test that from ... import * works for each module
2303#
2304
2305class _TestImportStar(BaseTestCase):
2306
2307 ALLOWED_TYPES = ('processes',)
2308
2309 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002310 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002311 'multiprocessing', 'multiprocessing.connection',
2312 'multiprocessing.heap', 'multiprocessing.managers',
2313 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002314 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002315 ]
2316
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002317 if HAS_REDUCTION:
2318 modules.append('multiprocessing.reduction')
2319
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002320 if c_int is not None:
2321 # This module requires _ctypes
2322 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002323
2324 for name in modules:
2325 __import__(name)
2326 mod = sys.modules[name]
2327
2328 for attr in getattr(mod, '__all__', ()):
2329 self.assertTrue(
2330 hasattr(mod, attr),
2331 '%r does not have attribute %r' % (mod, attr)
2332 )
2333
2334#
2335# Quick test that logging works -- does not test logging output
2336#
2337
2338class _TestLogging(BaseTestCase):
2339
2340 ALLOWED_TYPES = ('processes',)
2341
2342 def test_enable_logging(self):
2343 logger = multiprocessing.get_logger()
2344 logger.setLevel(util.SUBWARNING)
2345 self.assertTrue(logger is not None)
2346 logger.debug('this will not be printed')
2347 logger.info('nor will this')
2348 logger.setLevel(LOG_LEVEL)
2349
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002350 @classmethod
2351 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002352 logger = multiprocessing.get_logger()
2353 conn.send(logger.getEffectiveLevel())
2354
2355 def test_level(self):
2356 LEVEL1 = 32
2357 LEVEL2 = 37
2358
2359 logger = multiprocessing.get_logger()
2360 root_logger = logging.getLogger()
2361 root_level = root_logger.level
2362
2363 reader, writer = multiprocessing.Pipe(duplex=False)
2364
2365 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002366 p = self.Process(target=self._test_level, args=(writer,))
2367 p.daemon = True
2368 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002369 self.assertEqual(LEVEL1, reader.recv())
2370
2371 logger.setLevel(logging.NOTSET)
2372 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002373 p = self.Process(target=self._test_level, args=(writer,))
2374 p.daemon = True
2375 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002376 self.assertEqual(LEVEL2, reader.recv())
2377
2378 root_logger.setLevel(root_level)
2379 logger.setLevel(level=LOG_LEVEL)
2380
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002381
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002382# class _TestLoggingProcessName(BaseTestCase):
2383#
2384# def handle(self, record):
2385# assert record.processName == multiprocessing.current_process().name
2386# self.__handled = True
2387#
2388# def test_logging(self):
2389# handler = logging.Handler()
2390# handler.handle = self.handle
2391# self.__handled = False
2392# # Bypass getLogger() and side-effects
2393# logger = logging.getLoggerClass()(
2394# 'multiprocessing.test.TestLoggingProcessName')
2395# logger.addHandler(handler)
2396# logger.propagate = False
2397#
2398# logger.warn('foo')
2399# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002400
Benjamin Petersone711caf2008-06-11 16:44:04 +00002401#
Jesse Noller6214edd2009-01-19 16:23:53 +00002402# Test to verify handle verification, see issue 3321
2403#
2404
2405class TestInvalidHandle(unittest.TestCase):
2406
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002407 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002408 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002409 conn = multiprocessing.connection.Connection(44977608)
2410 try:
2411 self.assertRaises((ValueError, IOError), conn.poll)
2412 finally:
2413 # Hack private attribute _handle to avoid printing an error
2414 # in conn.__del__
2415 conn._handle = None
2416 self.assertRaises((ValueError, IOError),
2417 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002418
Jesse Noller6214edd2009-01-19 16:23:53 +00002419#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002420# Functions used to create test cases from the base ones in this module
2421#
2422
2423def get_attributes(Source, names):
2424 d = {}
2425 for name in names:
2426 obj = getattr(Source, name)
2427 if type(obj) == type(get_attributes):
2428 obj = staticmethod(obj)
2429 d[name] = obj
2430 return d
2431
2432def create_test_cases(Mixin, type):
2433 result = {}
2434 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002435 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002436
2437 for name in list(glob.keys()):
2438 if name.startswith('_Test'):
2439 base = glob[name]
2440 if type in base.ALLOWED_TYPES:
2441 newname = 'With' + Type + name[1:]
2442 class Temp(base, unittest.TestCase, Mixin):
2443 pass
2444 result[newname] = Temp
2445 Temp.__name__ = newname
2446 Temp.__module__ = Mixin.__module__
2447 return result
2448
2449#
2450# Create test cases
2451#
2452
2453class ProcessesMixin(object):
2454 TYPE = 'processes'
2455 Process = multiprocessing.Process
2456 locals().update(get_attributes(multiprocessing, (
2457 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2458 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2459 'RawArray', 'current_process', 'active_children', 'Pipe',
2460 'connection', 'JoinableQueue'
2461 )))
2462
2463testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2464globals().update(testcases_processes)
2465
2466
2467class ManagerMixin(object):
2468 TYPE = 'manager'
2469 Process = multiprocessing.Process
2470 manager = object.__new__(multiprocessing.managers.SyncManager)
2471 locals().update(get_attributes(manager, (
2472 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2473 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2474 'Namespace', 'JoinableQueue'
2475 )))
2476
2477testcases_manager = create_test_cases(ManagerMixin, type='manager')
2478globals().update(testcases_manager)
2479
2480
2481class ThreadsMixin(object):
2482 TYPE = 'threads'
2483 Process = multiprocessing.dummy.Process
2484 locals().update(get_attributes(multiprocessing.dummy, (
2485 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2486 'Condition', 'Event', 'Value', 'Array', 'current_process',
2487 'active_children', 'Pipe', 'connection', 'dict', 'list',
2488 'Namespace', 'JoinableQueue'
2489 )))
2490
2491testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2492globals().update(testcases_threads)
2493
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002494class OtherTest(unittest.TestCase):
2495 # TODO: add more tests for deliver/answer challenge.
2496 def test_deliver_challenge_auth_failure(self):
2497 class _FakeConnection(object):
2498 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002499 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002500 def send_bytes(self, data):
2501 pass
2502 self.assertRaises(multiprocessing.AuthenticationError,
2503 multiprocessing.connection.deliver_challenge,
2504 _FakeConnection(), b'abc')
2505
2506 def test_answer_challenge_auth_failure(self):
2507 class _FakeConnection(object):
2508 def __init__(self):
2509 self.count = 0
2510 def recv_bytes(self, size):
2511 self.count += 1
2512 if self.count == 1:
2513 return multiprocessing.connection.CHALLENGE
2514 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002515 return b'something bogus'
2516 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002517 def send_bytes(self, data):
2518 pass
2519 self.assertRaises(multiprocessing.AuthenticationError,
2520 multiprocessing.connection.answer_challenge,
2521 _FakeConnection(), b'abc')
2522
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002523#
2524# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2525#
2526
2527def initializer(ns):
2528 ns.test += 1
2529
2530class TestInitializers(unittest.TestCase):
2531 def setUp(self):
2532 self.mgr = multiprocessing.Manager()
2533 self.ns = self.mgr.Namespace()
2534 self.ns.test = 0
2535
2536 def tearDown(self):
2537 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002538 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002539
2540 def test_manager_initializer(self):
2541 m = multiprocessing.managers.SyncManager()
2542 self.assertRaises(TypeError, m.start, 1)
2543 m.start(initializer, (self.ns,))
2544 self.assertEqual(self.ns.test, 1)
2545 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002546 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002547
2548 def test_pool_initializer(self):
2549 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2550 p = multiprocessing.Pool(1, initializer, (self.ns,))
2551 p.close()
2552 p.join()
2553 self.assertEqual(self.ns.test, 1)
2554
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002555#
2556# Issue 5155, 5313, 5331: Test process in processes
2557# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2558#
2559
2560def _ThisSubProcess(q):
2561 try:
2562 item = q.get(block=False)
2563 except pyqueue.Empty:
2564 pass
2565
2566def _TestProcess(q):
2567 queue = multiprocessing.Queue()
2568 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002569 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002570 subProc.start()
2571 subProc.join()
2572
2573def _afunc(x):
2574 return x*x
2575
2576def pool_in_process():
2577 pool = multiprocessing.Pool(processes=4)
2578 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002579 pool.close()
2580 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002581
2582class _file_like(object):
2583 def __init__(self, delegate):
2584 self._delegate = delegate
2585 self._pid = None
2586
2587 @property
2588 def cache(self):
2589 pid = os.getpid()
2590 # There are no race conditions since fork keeps only the running thread
2591 if pid != self._pid:
2592 self._pid = pid
2593 self._cache = []
2594 return self._cache
2595
2596 def write(self, data):
2597 self.cache.append(data)
2598
2599 def flush(self):
2600 self._delegate.write(''.join(self.cache))
2601 self._cache = []
2602
2603class TestStdinBadfiledescriptor(unittest.TestCase):
2604
2605 def test_queue_in_process(self):
2606 queue = multiprocessing.Queue()
2607 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2608 proc.start()
2609 proc.join()
2610
2611 def test_pool_in_process(self):
2612 p = multiprocessing.Process(target=pool_in_process)
2613 p.start()
2614 p.join()
2615
2616 def test_flushing(self):
2617 sio = io.StringIO()
2618 flike = _file_like(sio)
2619 flike.write('foo')
2620 proc = multiprocessing.Process(target=lambda: flike.flush())
2621 flike.flush()
2622 assert sio.getvalue() == 'foo'
2623
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002624
2625class TestWait(unittest.TestCase):
2626
2627 @classmethod
2628 def _child_test_wait(cls, w, slow):
2629 for i in range(10):
2630 if slow:
2631 time.sleep(random.random()*0.1)
2632 w.send((i, os.getpid()))
2633 w.close()
2634
2635 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002636 from multiprocessing.connection import wait
2637 readers = []
2638 procs = []
2639 messages = []
2640
2641 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002642 r, w = multiprocessing.Pipe(duplex=False)
2643 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002644 p.daemon = True
2645 p.start()
2646 w.close()
2647 readers.append(r)
2648 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002649 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002650
2651 while readers:
2652 for r in wait(readers):
2653 try:
2654 msg = r.recv()
2655 except EOFError:
2656 readers.remove(r)
2657 r.close()
2658 else:
2659 messages.append(msg)
2660
2661 messages.sort()
2662 expected = sorted((i, p.pid) for i in range(10) for p in procs)
2663 self.assertEqual(messages, expected)
2664
2665 @classmethod
2666 def _child_test_wait_socket(cls, address, slow):
2667 s = socket.socket()
2668 s.connect(address)
2669 for i in range(10):
2670 if slow:
2671 time.sleep(random.random()*0.1)
2672 s.sendall(('%s\n' % i).encode('ascii'))
2673 s.close()
2674
2675 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002676 from multiprocessing.connection import wait
2677 l = socket.socket()
2678 l.bind(('', 0))
2679 l.listen(4)
2680 addr = ('localhost', l.getsockname()[1])
2681 readers = []
2682 procs = []
2683 dic = {}
2684
2685 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002686 p = multiprocessing.Process(target=self._child_test_wait_socket,
2687 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002688 p.daemon = True
2689 p.start()
2690 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002691 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002692
2693 for i in range(4):
2694 r, _ = l.accept()
2695 readers.append(r)
2696 dic[r] = []
2697 l.close()
2698
2699 while readers:
2700 for r in wait(readers):
2701 msg = r.recv(32)
2702 if not msg:
2703 readers.remove(r)
2704 r.close()
2705 else:
2706 dic[r].append(msg)
2707
2708 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
2709 for v in dic.values():
2710 self.assertEqual(b''.join(v), expected)
2711
2712 def test_wait_slow(self):
2713 self.test_wait(True)
2714
2715 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01002716 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002717
2718 def test_wait_timeout(self):
2719 from multiprocessing.connection import wait
2720
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002721 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002722 a, b = multiprocessing.Pipe()
2723
2724 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002725 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002726 delta = time.time() - start
2727
2728 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01002729 self.assertLess(delta, expected * 2)
2730 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002731
2732 b.send(None)
2733
2734 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002735 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002736 delta = time.time() - start
2737
2738 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01002739 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002740
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002741 @classmethod
2742 def signal_and_sleep(cls, sem, period):
2743 sem.release()
2744 time.sleep(period)
2745
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002746 def test_wait_integer(self):
2747 from multiprocessing.connection import wait
2748
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002749 expected = 3
2750 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002751 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002752 p = multiprocessing.Process(target=self.signal_and_sleep,
2753 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002754
2755 p.start()
2756 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002757 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002758
2759 start = time.time()
2760 res = wait([a, p.sentinel, b], expected + 20)
2761 delta = time.time() - start
2762
2763 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01002764 self.assertLess(delta, expected + 2)
2765 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002766
2767 a.send(None)
2768
2769 start = time.time()
2770 res = wait([a, p.sentinel, b], 20)
2771 delta = time.time() - start
2772
2773 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002774 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002775
2776 b.send(None)
2777
2778 start = time.time()
2779 res = wait([a, p.sentinel, b], 20)
2780 delta = time.time() - start
2781
2782 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002783 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002784
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002785 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002786 p.join()
2787
2788
Antoine Pitrou709176f2012-04-01 17:19:09 +02002789#
2790# Issue 14151: Test invalid family on invalid environment
2791#
2792
2793class TestInvalidFamily(unittest.TestCase):
2794
2795 @unittest.skipIf(WIN32, "skipped on Windows")
2796 def test_invalid_family(self):
2797 with self.assertRaises(ValueError):
2798 multiprocessing.connection.Listener(r'\\.\test')
2799
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002800 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2801 def test_invalid_family_win32(self):
2802 with self.assertRaises(ValueError):
2803 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002804
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002805testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002806 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002807
Benjamin Petersone711caf2008-06-11 16:44:04 +00002808#
2809#
2810#
2811
2812def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002813 if sys.platform.startswith("linux"):
2814 try:
2815 lock = multiprocessing.RLock()
2816 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002817 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002818
Charles-François Natali221ef672011-11-22 18:55:22 +01002819 check_enough_semaphores()
2820
Benjamin Petersone711caf2008-06-11 16:44:04 +00002821 if run is None:
2822 from test.support import run_unittest as run
2823
2824 util.get_temp_dir() # creates temp directory for use by all processes
2825
2826 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2827
Benjamin Peterson41181742008-07-02 20:22:54 +00002828 ProcessesMixin.pool = multiprocessing.Pool(4)
2829 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2830 ManagerMixin.manager.__init__()
2831 ManagerMixin.manager.start()
2832 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002833
2834 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002835 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2836 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002837 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2838 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002839 )
2840
2841 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2842 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002843 try:
2844 run(suite)
2845 finally:
2846 ThreadsMixin.pool.terminate()
2847 ProcessesMixin.pool.terminate()
2848 ManagerMixin.pool.terminate()
2849 ManagerMixin.pool.join()
2850 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002851 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002852 ThreadsMixin.pool.join()
2853 ProcessesMixin.pool.join()
2854 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002855
2856def main():
2857 test_main(unittest.TextTestRunner(verbosity=2).run)
2858
2859if __name__ == '__main__':
2860 main()