blob: 3f8a9a2d3582095b6c3e979d820dcf325aa0a9e0 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Antoine Pitroubcb39d42011-08-23 19:46:22 +020038from multiprocessing import util, reduction
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Antoine Pitroubcb39d42011-08-23 19:46:22 +020046try:
47 import msvcrt
48except ImportError:
49 msvcrt = None
50
Benjamin Petersone711caf2008-06-11 16:44:04 +000051#
52#
53#
54
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000055def latin(s):
56 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000057
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59# Constants
60#
61
62LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000063#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
65DELTA = 0.1
66CHECK_TIMINGS = False # making true makes tests take a lot longer
67 # and can sometimes cause some non-serious
68 # failures because some calls block a bit
69 # longer than expected
70if CHECK_TIMINGS:
71 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
72else:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
74
75HAVE_GETVALUE = not getattr(_multiprocessing,
76 'HAVE_BROKEN_SEM_GETVALUE', False)
77
Jesse Noller6214edd2009-01-19 16:23:53 +000078WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020079if WIN32:
80 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
81
82 def wait_for_handle(handle, timeout):
83 if timeout is None or timeout < 0.0:
84 timeout = INFINITE
85 else:
86 timeout = int(1000 * timeout)
87 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
88else:
89 from select import select
90 _select = util._eintr_retry(select)
91
92 def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
112#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000113# Creates a wrapper for a function which records the time it takes to finish
114#
115
116class TimingWrapper(object):
117
118 def __init__(self, func):
119 self.func = func
120 self.elapsed = None
121
122 def __call__(self, *args, **kwds):
123 t = time.time()
124 try:
125 return self.func(*args, **kwds)
126 finally:
127 self.elapsed = time.time() - t
128
129#
130# Base class for test cases
131#
132
133class BaseTestCase(object):
134
135 ALLOWED_TYPES = ('processes', 'manager', 'threads')
136
137 def assertTimingAlmostEqual(self, a, b):
138 if CHECK_TIMINGS:
139 self.assertAlmostEqual(a, b, 1)
140
141 def assertReturnsIfImplemented(self, value, func, *args):
142 try:
143 res = func(*args)
144 except NotImplementedError:
145 pass
146 else:
147 return self.assertEqual(value, res)
148
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000149 # For the sanity of Windows users, rather than crashing or freezing in
150 # multiple ways.
151 def __reduce__(self, *args):
152 raise NotImplementedError("shouldn't try to pickle a test case")
153
154 __reduce_ex__ = __reduce__
155
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156#
157# Return the value of a semaphore
158#
159
160def get_value(self):
161 try:
162 return self.get_value()
163 except AttributeError:
164 try:
165 return self._Semaphore__value
166 except AttributeError:
167 try:
168 return self._value
169 except AttributeError:
170 raise NotImplementedError
171
172#
173# Testcases
174#
175
176class _TestProcess(BaseTestCase):
177
178 ALLOWED_TYPES = ('processes', 'threads')
179
180 def test_current(self):
181 if self.TYPE == 'threads':
182 return
183
184 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186
187 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000188 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000189 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEqual(current.ident, os.getpid())
192 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000194 def test_daemon_argument(self):
195 if self.TYPE == "threads":
196 return
197
198 # By default uses the current process's daemon flag.
199 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000200 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000201 proc1 = self.Process(target=self._test, daemon=True)
202 self.assertTrue(proc1.daemon)
203 proc2 = self.Process(target=self._test, daemon=False)
204 self.assertFalse(proc2.daemon)
205
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000206 @classmethod
207 def _test(cls, q, *args, **kwds):
208 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209 q.put(args)
210 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000212 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000213 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214 q.put(current.pid)
215
216 def test_process(self):
217 q = self.Queue(1)
218 e = self.Event()
219 args = (q, 1, 2)
220 kwargs = {'hello':23, 'bye':2.54}
221 name = 'SomeProcess'
222 p = self.Process(
223 target=self._test, args=args, kwargs=kwargs, name=name
224 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 current = self.current_process()
227
228 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000229 self.assertEqual(p.authkey, current.authkey)
230 self.assertEqual(p.is_alive(), False)
231 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000232 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000233 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235
236 p.start()
237
Ezio Melottib3aedd42010-11-20 19:04:17 +0000238 self.assertEqual(p.exitcode, None)
239 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000240 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
Ezio Melottib3aedd42010-11-20 19:04:17 +0000242 self.assertEqual(q.get(), args[1:])
243 self.assertEqual(q.get(), kwargs)
244 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(q.get(), current.authkey)
247 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248
249 p.join()
250
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(p.exitcode, 0)
252 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000255 @classmethod
256 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257 time.sleep(1000)
258
259 def test_terminate(self):
260 if self.TYPE == 'threads':
261 return
262
263 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000264 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265 p.start()
266
267 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000269 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
271 p.terminate()
272
273 join = TimingWrapper(p.join)
274 self.assertEqual(join(), None)
275 self.assertTimingAlmostEqual(join.elapsed, 0.0)
276
277 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000278 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279
280 p.join()
281
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000282 # XXX sometimes get p.exitcode == 0 on Windows ...
283 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000284
285 def test_cpu_count(self):
286 try:
287 cpus = multiprocessing.cpu_count()
288 except NotImplementedError:
289 cpus = 1
290 self.assertTrue(type(cpus) is int)
291 self.assertTrue(cpus >= 1)
292
293 def test_active_children(self):
294 self.assertEqual(type(self.active_children()), list)
295
296 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000297 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298
Jesus Cea94f964f2011-09-09 20:26:57 +0200299 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000300 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000304 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000305
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000306 @classmethod
307 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000308 from multiprocessing import forking
309 wconn.send(id)
310 if len(id) < 2:
311 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000312 p = cls.Process(
313 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000314 )
315 p.start()
316 p.join()
317
318 def test_recursion(self):
319 rconn, wconn = self.Pipe(duplex=False)
320 self._test_recursion(wconn, [])
321
322 time.sleep(DELTA)
323 result = []
324 while rconn.poll():
325 result.append(rconn.recv())
326
327 expected = [
328 [],
329 [0],
330 [0, 0],
331 [0, 1],
332 [1],
333 [1, 0],
334 [1, 1]
335 ]
336 self.assertEqual(result, expected)
337
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200338 @classmethod
339 def _test_sentinel(cls, event):
340 event.wait(10.0)
341
342 def test_sentinel(self):
343 if self.TYPE == "threads":
344 return
345 event = self.Event()
346 p = self.Process(target=self._test_sentinel, args=(event,))
347 with self.assertRaises(ValueError):
348 p.sentinel
349 p.start()
350 self.addCleanup(p.join)
351 sentinel = p.sentinel
352 self.assertIsInstance(sentinel, int)
353 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
354 event.set()
355 p.join()
356 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
357
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358#
359#
360#
361
362class _UpperCaser(multiprocessing.Process):
363
364 def __init__(self):
365 multiprocessing.Process.__init__(self)
366 self.child_conn, self.parent_conn = multiprocessing.Pipe()
367
368 def run(self):
369 self.parent_conn.close()
370 for s in iter(self.child_conn.recv, None):
371 self.child_conn.send(s.upper())
372 self.child_conn.close()
373
374 def submit(self, s):
375 assert type(s) is str
376 self.parent_conn.send(s)
377 return self.parent_conn.recv()
378
379 def stop(self):
380 self.parent_conn.send(None)
381 self.parent_conn.close()
382 self.child_conn.close()
383
384class _TestSubclassingProcess(BaseTestCase):
385
386 ALLOWED_TYPES = ('processes',)
387
388 def test_subclassing(self):
389 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200390 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000391 uppercaser.start()
392 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
393 self.assertEqual(uppercaser.submit('world'), 'WORLD')
394 uppercaser.stop()
395 uppercaser.join()
396
397#
398#
399#
400
401def queue_empty(q):
402 if hasattr(q, 'empty'):
403 return q.empty()
404 else:
405 return q.qsize() == 0
406
407def queue_full(q, maxsize):
408 if hasattr(q, 'full'):
409 return q.full()
410 else:
411 return q.qsize() == maxsize
412
413
414class _TestQueue(BaseTestCase):
415
416
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000417 @classmethod
418 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000419 child_can_start.wait()
420 for i in range(6):
421 queue.get()
422 parent_can_continue.set()
423
424 def test_put(self):
425 MAXSIZE = 6
426 queue = self.Queue(maxsize=MAXSIZE)
427 child_can_start = self.Event()
428 parent_can_continue = self.Event()
429
430 proc = self.Process(
431 target=self._test_put,
432 args=(queue, child_can_start, parent_can_continue)
433 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000434 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000435 proc.start()
436
437 self.assertEqual(queue_empty(queue), True)
438 self.assertEqual(queue_full(queue, MAXSIZE), False)
439
440 queue.put(1)
441 queue.put(2, True)
442 queue.put(3, True, None)
443 queue.put(4, False)
444 queue.put(5, False, None)
445 queue.put_nowait(6)
446
447 # the values may be in buffer but not yet in pipe so sleep a bit
448 time.sleep(DELTA)
449
450 self.assertEqual(queue_empty(queue), False)
451 self.assertEqual(queue_full(queue, MAXSIZE), True)
452
453 put = TimingWrapper(queue.put)
454 put_nowait = TimingWrapper(queue.put_nowait)
455
456 self.assertRaises(pyqueue.Full, put, 7, False)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(pyqueue.Full, put, 7, False, None)
460 self.assertTimingAlmostEqual(put.elapsed, 0)
461
462 self.assertRaises(pyqueue.Full, put_nowait, 7)
463 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
464
465 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
466 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
467
468 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
469 self.assertTimingAlmostEqual(put.elapsed, 0)
470
471 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
473
474 child_can_start.set()
475 parent_can_continue.wait()
476
477 self.assertEqual(queue_empty(queue), True)
478 self.assertEqual(queue_full(queue, MAXSIZE), False)
479
480 proc.join()
481
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000482 @classmethod
483 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000484 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000485 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000486 queue.put(2)
487 queue.put(3)
488 queue.put(4)
489 queue.put(5)
490 parent_can_continue.set()
491
492 def test_get(self):
493 queue = self.Queue()
494 child_can_start = self.Event()
495 parent_can_continue = self.Event()
496
497 proc = self.Process(
498 target=self._test_get,
499 args=(queue, child_can_start, parent_can_continue)
500 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000501 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 proc.start()
503
504 self.assertEqual(queue_empty(queue), True)
505
506 child_can_start.set()
507 parent_can_continue.wait()
508
509 time.sleep(DELTA)
510 self.assertEqual(queue_empty(queue), False)
511
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000512 # Hangs unexpectedly, remove for now
513 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000514 self.assertEqual(queue.get(True, None), 2)
515 self.assertEqual(queue.get(True), 3)
516 self.assertEqual(queue.get(timeout=1), 4)
517 self.assertEqual(queue.get_nowait(), 5)
518
519 self.assertEqual(queue_empty(queue), True)
520
521 get = TimingWrapper(queue.get)
522 get_nowait = TimingWrapper(queue.get_nowait)
523
524 self.assertRaises(pyqueue.Empty, get, False)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(pyqueue.Empty, get, False, None)
528 self.assertTimingAlmostEqual(get.elapsed, 0)
529
530 self.assertRaises(pyqueue.Empty, get_nowait)
531 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
532
533 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
534 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
535
536 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
537 self.assertTimingAlmostEqual(get.elapsed, 0)
538
539 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
541
542 proc.join()
543
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000544 @classmethod
545 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000546 for i in range(10, 20):
547 queue.put(i)
548 # note that at this point the items may only be buffered, so the
549 # process cannot shutdown until the feeder thread has finished
550 # pushing items onto the pipe.
551
552 def test_fork(self):
553 # Old versions of Queue would fail to create a new feeder
554 # thread for a forked process if the original process had its
555 # own feeder thread. This test checks that this no longer
556 # happens.
557
558 queue = self.Queue()
559
560 # put items on queue so that main process starts a feeder thread
561 for i in range(10):
562 queue.put(i)
563
564 # wait to make sure thread starts before we fork a new process
565 time.sleep(DELTA)
566
567 # fork process
568 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200569 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000570 p.start()
571
572 # check that all expected items are in the queue
573 for i in range(20):
574 self.assertEqual(queue.get(), i)
575 self.assertRaises(pyqueue.Empty, queue.get, False)
576
577 p.join()
578
579 def test_qsize(self):
580 q = self.Queue()
581 try:
582 self.assertEqual(q.qsize(), 0)
583 except NotImplementedError:
584 return
585 q.put(1)
586 self.assertEqual(q.qsize(), 1)
587 q.put(5)
588 self.assertEqual(q.qsize(), 2)
589 q.get()
590 self.assertEqual(q.qsize(), 1)
591 q.get()
592 self.assertEqual(q.qsize(), 0)
593
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000594 @classmethod
595 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000596 for obj in iter(q.get, None):
597 time.sleep(DELTA)
598 q.task_done()
599
600 def test_task_done(self):
601 queue = self.JoinableQueue()
602
603 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000604 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000605
606 workers = [self.Process(target=self._test_task_done, args=(queue,))
607 for i in range(4)]
608
609 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200610 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000611 p.start()
612
613 for i in range(10):
614 queue.put(i)
615
616 queue.join()
617
618 for p in workers:
619 queue.put(None)
620
621 for p in workers:
622 p.join()
623
624#
625#
626#
627
628class _TestLock(BaseTestCase):
629
630 def test_lock(self):
631 lock = self.Lock()
632 self.assertEqual(lock.acquire(), True)
633 self.assertEqual(lock.acquire(False), False)
634 self.assertEqual(lock.release(), None)
635 self.assertRaises((ValueError, threading.ThreadError), lock.release)
636
637 def test_rlock(self):
638 lock = self.RLock()
639 self.assertEqual(lock.acquire(), True)
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.acquire(), True)
642 self.assertEqual(lock.release(), None)
643 self.assertEqual(lock.release(), None)
644 self.assertEqual(lock.release(), None)
645 self.assertRaises((AssertionError, RuntimeError), lock.release)
646
Jesse Nollerf8d00852009-03-31 03:25:07 +0000647 def test_lock_context(self):
648 with self.Lock():
649 pass
650
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651
652class _TestSemaphore(BaseTestCase):
653
654 def _test_semaphore(self, sem):
655 self.assertReturnsIfImplemented(2, get_value, sem)
656 self.assertEqual(sem.acquire(), True)
657 self.assertReturnsIfImplemented(1, get_value, sem)
658 self.assertEqual(sem.acquire(), True)
659 self.assertReturnsIfImplemented(0, get_value, sem)
660 self.assertEqual(sem.acquire(False), False)
661 self.assertReturnsIfImplemented(0, get_value, sem)
662 self.assertEqual(sem.release(), None)
663 self.assertReturnsIfImplemented(1, get_value, sem)
664 self.assertEqual(sem.release(), None)
665 self.assertReturnsIfImplemented(2, get_value, sem)
666
667 def test_semaphore(self):
668 sem = self.Semaphore(2)
669 self._test_semaphore(sem)
670 self.assertEqual(sem.release(), None)
671 self.assertReturnsIfImplemented(3, get_value, sem)
672 self.assertEqual(sem.release(), None)
673 self.assertReturnsIfImplemented(4, get_value, sem)
674
675 def test_bounded_semaphore(self):
676 sem = self.BoundedSemaphore(2)
677 self._test_semaphore(sem)
678 # Currently fails on OS/X
679 #if HAVE_GETVALUE:
680 # self.assertRaises(ValueError, sem.release)
681 # self.assertReturnsIfImplemented(2, get_value, sem)
682
683 def test_timeout(self):
684 if self.TYPE != 'processes':
685 return
686
687 sem = self.Semaphore(0)
688 acquire = TimingWrapper(sem.acquire)
689
690 self.assertEqual(acquire(False), False)
691 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
692
693 self.assertEqual(acquire(False, None), False)
694 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
695
696 self.assertEqual(acquire(False, TIMEOUT1), False)
697 self.assertTimingAlmostEqual(acquire.elapsed, 0)
698
699 self.assertEqual(acquire(True, TIMEOUT2), False)
700 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
701
702 self.assertEqual(acquire(timeout=TIMEOUT3), False)
703 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
704
705
706class _TestCondition(BaseTestCase):
707
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000708 @classmethod
709 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000710 cond.acquire()
711 sleeping.release()
712 cond.wait(timeout)
713 woken.release()
714 cond.release()
715
716 def check_invariant(self, cond):
717 # this is only supposed to succeed when there are no sleepers
718 if self.TYPE == 'processes':
719 try:
720 sleepers = (cond._sleeping_count.get_value() -
721 cond._woken_count.get_value())
722 self.assertEqual(sleepers, 0)
723 self.assertEqual(cond._wait_semaphore.get_value(), 0)
724 except NotImplementedError:
725 pass
726
727 def test_notify(self):
728 cond = self.Condition()
729 sleeping = self.Semaphore(0)
730 woken = self.Semaphore(0)
731
732 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000733 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000734 p.start()
735
736 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000737 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000738 p.start()
739
740 # wait for both children to start sleeping
741 sleeping.acquire()
742 sleeping.acquire()
743
744 # check no process/thread has woken up
745 time.sleep(DELTA)
746 self.assertReturnsIfImplemented(0, get_value, woken)
747
748 # wake up one process/thread
749 cond.acquire()
750 cond.notify()
751 cond.release()
752
753 # check one process/thread has woken up
754 time.sleep(DELTA)
755 self.assertReturnsIfImplemented(1, get_value, woken)
756
757 # wake up another
758 cond.acquire()
759 cond.notify()
760 cond.release()
761
762 # check other has woken up
763 time.sleep(DELTA)
764 self.assertReturnsIfImplemented(2, get_value, woken)
765
766 # check state is not mucked up
767 self.check_invariant(cond)
768 p.join()
769
770 def test_notify_all(self):
771 cond = self.Condition()
772 sleeping = self.Semaphore(0)
773 woken = self.Semaphore(0)
774
775 # start some threads/processes which will timeout
776 for i in range(3):
777 p = self.Process(target=self.f,
778 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000779 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 p.start()
781
782 t = threading.Thread(target=self.f,
783 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000784 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 t.start()
786
787 # wait for them all to sleep
788 for i in range(6):
789 sleeping.acquire()
790
791 # check they have all timed out
792 for i in range(6):
793 woken.acquire()
794 self.assertReturnsIfImplemented(0, get_value, woken)
795
796 # check state is not mucked up
797 self.check_invariant(cond)
798
799 # start some more threads/processes
800 for i in range(3):
801 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000802 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 p.start()
804
805 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000806 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 t.start()
808
809 # wait for them to all sleep
810 for i in range(6):
811 sleeping.acquire()
812
813 # check no process/thread has woken up
814 time.sleep(DELTA)
815 self.assertReturnsIfImplemented(0, get_value, woken)
816
817 # wake them all up
818 cond.acquire()
819 cond.notify_all()
820 cond.release()
821
822 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200823 for i in range(10):
824 try:
825 if get_value(woken) == 6:
826 break
827 except NotImplementedError:
828 break
829 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830 self.assertReturnsIfImplemented(6, get_value, woken)
831
832 # check state is not mucked up
833 self.check_invariant(cond)
834
835 def test_timeout(self):
836 cond = self.Condition()
837 wait = TimingWrapper(cond.wait)
838 cond.acquire()
839 res = wait(TIMEOUT1)
840 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000841 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000842 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
843
844
845class _TestEvent(BaseTestCase):
846
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000847 @classmethod
848 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 time.sleep(TIMEOUT2)
850 event.set()
851
852 def test_event(self):
853 event = self.Event()
854 wait = TimingWrapper(event.wait)
855
Ezio Melotti13925002011-03-16 11:05:33 +0200856 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000857 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000858 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859
Benjamin Peterson965ce872009-04-05 21:24:58 +0000860 # Removed, threading.Event.wait() will return the value of the __flag
861 # instead of None. API Shear with the semaphore backed mp.Event
862 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000864 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
866
867 event.set()
868
869 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000870 self.assertEqual(event.is_set(), True)
871 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000872 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000873 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
875 # self.assertEqual(event.is_set(), True)
876
877 event.clear()
878
879 #self.assertEqual(event.is_set(), False)
880
Jesus Cea94f964f2011-09-09 20:26:57 +0200881 p = self.Process(target=self._test_event, args=(event,))
882 p.daemon = True
883 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000884 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885
886#
887#
888#
889
890class _TestValue(BaseTestCase):
891
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000892 ALLOWED_TYPES = ('processes',)
893
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894 codes_values = [
895 ('i', 4343, 24234),
896 ('d', 3.625, -4.25),
897 ('h', -232, 234),
898 ('c', latin('x'), latin('y'))
899 ]
900
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000901 def setUp(self):
902 if not HAS_SHAREDCTYPES:
903 self.skipTest("requires multiprocessing.sharedctypes")
904
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000905 @classmethod
906 def _test(cls, values):
907 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 sv.value = cv[2]
909
910
911 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000912 if raw:
913 values = [self.RawValue(code, value)
914 for code, value, _ in self.codes_values]
915 else:
916 values = [self.Value(code, value)
917 for code, value, _ in self.codes_values]
918
919 for sv, cv in zip(values, self.codes_values):
920 self.assertEqual(sv.value, cv[1])
921
922 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200923 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000924 proc.start()
925 proc.join()
926
927 for sv, cv in zip(values, self.codes_values):
928 self.assertEqual(sv.value, cv[2])
929
930 def test_rawvalue(self):
931 self.test_value(raw=True)
932
933 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000934 val1 = self.Value('i', 5)
935 lock1 = val1.get_lock()
936 obj1 = val1.get_obj()
937
938 val2 = self.Value('i', 5, lock=None)
939 lock2 = val2.get_lock()
940 obj2 = val2.get_obj()
941
942 lock = self.Lock()
943 val3 = self.Value('i', 5, lock=lock)
944 lock3 = val3.get_lock()
945 obj3 = val3.get_obj()
946 self.assertEqual(lock, lock3)
947
Jesse Nollerb0516a62009-01-18 03:11:38 +0000948 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000949 self.assertFalse(hasattr(arr4, 'get_lock'))
950 self.assertFalse(hasattr(arr4, 'get_obj'))
951
Jesse Nollerb0516a62009-01-18 03:11:38 +0000952 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
953
954 arr5 = self.RawValue('i', 5)
955 self.assertFalse(hasattr(arr5, 'get_lock'))
956 self.assertFalse(hasattr(arr5, 'get_obj'))
957
Benjamin Petersone711caf2008-06-11 16:44:04 +0000958
959class _TestArray(BaseTestCase):
960
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000961 ALLOWED_TYPES = ('processes',)
962
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000963 @classmethod
964 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000965 for i in range(1, len(seq)):
966 seq[i] += seq[i-1]
967
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000968 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000969 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
971 if raw:
972 arr = self.RawArray('i', seq)
973 else:
974 arr = self.Array('i', seq)
975
976 self.assertEqual(len(arr), len(seq))
977 self.assertEqual(arr[3], seq[3])
978 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
979
980 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
981
982 self.assertEqual(list(arr[:]), seq)
983
984 self.f(seq)
985
986 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200987 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988 p.start()
989 p.join()
990
991 self.assertEqual(list(arr[:]), seq)
992
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000993 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000994 def test_array_from_size(self):
995 size = 10
996 # Test for zeroing (see issue #11675).
997 # The repetition below strengthens the test by increasing the chances
998 # of previously allocated non-zero memory being used for the new array
999 # on the 2nd and 3rd loops.
1000 for _ in range(3):
1001 arr = self.Array('i', size)
1002 self.assertEqual(len(arr), size)
1003 self.assertEqual(list(arr), [0] * size)
1004 arr[:] = range(10)
1005 self.assertEqual(list(arr), list(range(10)))
1006 del arr
1007
1008 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001009 def test_rawarray(self):
1010 self.test_array(raw=True)
1011
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001012 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001013 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001014 arr1 = self.Array('i', list(range(10)))
1015 lock1 = arr1.get_lock()
1016 obj1 = arr1.get_obj()
1017
1018 arr2 = self.Array('i', list(range(10)), lock=None)
1019 lock2 = arr2.get_lock()
1020 obj2 = arr2.get_obj()
1021
1022 lock = self.Lock()
1023 arr3 = self.Array('i', list(range(10)), lock=lock)
1024 lock3 = arr3.get_lock()
1025 obj3 = arr3.get_obj()
1026 self.assertEqual(lock, lock3)
1027
Jesse Nollerb0516a62009-01-18 03:11:38 +00001028 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001029 self.assertFalse(hasattr(arr4, 'get_lock'))
1030 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001031 self.assertRaises(AttributeError,
1032 self.Array, 'i', range(10), lock='notalock')
1033
1034 arr5 = self.RawArray('i', range(10))
1035 self.assertFalse(hasattr(arr5, 'get_lock'))
1036 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037
1038#
1039#
1040#
1041
1042class _TestContainers(BaseTestCase):
1043
1044 ALLOWED_TYPES = ('manager',)
1045
1046 def test_list(self):
1047 a = self.list(list(range(10)))
1048 self.assertEqual(a[:], list(range(10)))
1049
1050 b = self.list()
1051 self.assertEqual(b[:], [])
1052
1053 b.extend(list(range(5)))
1054 self.assertEqual(b[:], list(range(5)))
1055
1056 self.assertEqual(b[2], 2)
1057 self.assertEqual(b[2:10], [2,3,4])
1058
1059 b *= 2
1060 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1061
1062 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1063
1064 self.assertEqual(a[:], list(range(10)))
1065
1066 d = [a, b]
1067 e = self.list(d)
1068 self.assertEqual(
1069 e[:],
1070 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1071 )
1072
1073 f = self.list([a])
1074 a.append('hello')
1075 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1076
1077 def test_dict(self):
1078 d = self.dict()
1079 indices = list(range(65, 70))
1080 for i in indices:
1081 d[i] = chr(i)
1082 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1083 self.assertEqual(sorted(d.keys()), indices)
1084 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1085 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1086
1087 def test_namespace(self):
1088 n = self.Namespace()
1089 n.name = 'Bob'
1090 n.job = 'Builder'
1091 n._hidden = 'hidden'
1092 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1093 del n.job
1094 self.assertEqual(str(n), "Namespace(name='Bob')")
1095 self.assertTrue(hasattr(n, 'name'))
1096 self.assertTrue(not hasattr(n, 'job'))
1097
1098#
1099#
1100#
1101
1102def sqr(x, wait=0.0):
1103 time.sleep(wait)
1104 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001105
Benjamin Petersone711caf2008-06-11 16:44:04 +00001106class _TestPool(BaseTestCase):
1107
1108 def test_apply(self):
1109 papply = self.pool.apply
1110 self.assertEqual(papply(sqr, (5,)), sqr(5))
1111 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1112
1113 def test_map(self):
1114 pmap = self.pool.map
1115 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1116 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1117 list(map(sqr, list(range(100)))))
1118
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001119 def test_map_chunksize(self):
1120 try:
1121 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1122 except multiprocessing.TimeoutError:
1123 self.fail("pool.map_async with chunksize stalled on null list")
1124
Benjamin Petersone711caf2008-06-11 16:44:04 +00001125 def test_async(self):
1126 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1127 get = TimingWrapper(res.get)
1128 self.assertEqual(get(), 49)
1129 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1130
1131 def test_async_timeout(self):
1132 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1133 get = TimingWrapper(res.get)
1134 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1135 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1136
1137 def test_imap(self):
1138 it = self.pool.imap(sqr, list(range(10)))
1139 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1140
1141 it = self.pool.imap(sqr, list(range(10)))
1142 for i in range(10):
1143 self.assertEqual(next(it), i*i)
1144 self.assertRaises(StopIteration, it.__next__)
1145
1146 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1147 for i in range(1000):
1148 self.assertEqual(next(it), i*i)
1149 self.assertRaises(StopIteration, it.__next__)
1150
1151 def test_imap_unordered(self):
1152 it = self.pool.imap_unordered(sqr, list(range(1000)))
1153 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1154
1155 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1156 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1157
1158 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001159 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1160 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1161
Benjamin Petersone711caf2008-06-11 16:44:04 +00001162 p = multiprocessing.Pool(3)
1163 self.assertEqual(3, len(p._pool))
1164 p.close()
1165 p.join()
1166
1167 def test_terminate(self):
1168 if self.TYPE == 'manager':
1169 # On Unix a forked process increfs each shared object to
1170 # which its parent process held a reference. If the
1171 # forked process gets terminated then there is likely to
1172 # be a reference leak. So to prevent
1173 # _TestZZZNumberOfObjects from failing we skip this test
1174 # when using a manager.
1175 return
1176
1177 result = self.pool.map_async(
1178 time.sleep, [0.1 for i in range(10000)], chunksize=1
1179 )
1180 self.pool.terminate()
1181 join = TimingWrapper(self.pool.join)
1182 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001183 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001184
Ask Solem2afcbf22010-11-09 20:55:52 +00001185def raising():
1186 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001187
Ask Solem2afcbf22010-11-09 20:55:52 +00001188def unpickleable_result():
1189 return lambda: 42
1190
1191class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001192 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001193
1194 def test_async_error_callback(self):
1195 p = multiprocessing.Pool(2)
1196
1197 scratchpad = [None]
1198 def errback(exc):
1199 scratchpad[0] = exc
1200
1201 res = p.apply_async(raising, error_callback=errback)
1202 self.assertRaises(KeyError, res.get)
1203 self.assertTrue(scratchpad[0])
1204 self.assertIsInstance(scratchpad[0], KeyError)
1205
1206 p.close()
1207 p.join()
1208
1209 def test_unpickleable_result(self):
1210 from multiprocessing.pool import MaybeEncodingError
1211 p = multiprocessing.Pool(2)
1212
1213 # Make sure we don't lose pool processes because of encoding errors.
1214 for iteration in range(20):
1215
1216 scratchpad = [None]
1217 def errback(exc):
1218 scratchpad[0] = exc
1219
1220 res = p.apply_async(unpickleable_result, error_callback=errback)
1221 self.assertRaises(MaybeEncodingError, res.get)
1222 wrapped = scratchpad[0]
1223 self.assertTrue(wrapped)
1224 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1225 self.assertIsNotNone(wrapped.exc)
1226 self.assertIsNotNone(wrapped.value)
1227
1228 p.close()
1229 p.join()
1230
1231class _TestPoolWorkerLifetime(BaseTestCase):
1232 ALLOWED_TYPES = ('processes', )
1233
Jesse Noller1f0b6582010-01-27 03:36:01 +00001234 def test_pool_worker_lifetime(self):
1235 p = multiprocessing.Pool(3, maxtasksperchild=10)
1236 self.assertEqual(3, len(p._pool))
1237 origworkerpids = [w.pid for w in p._pool]
1238 # Run many tasks so each worker gets replaced (hopefully)
1239 results = []
1240 for i in range(100):
1241 results.append(p.apply_async(sqr, (i, )))
1242 # Fetch the results and verify we got the right answers,
1243 # also ensuring all the tasks have completed.
1244 for (j, res) in enumerate(results):
1245 self.assertEqual(res.get(), sqr(j))
1246 # Refill the pool
1247 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001248 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001249 # (countdown * DELTA = 5 seconds max startup process time)
1250 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001251 while countdown and not all(w.is_alive() for w in p._pool):
1252 countdown -= 1
1253 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001254 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001255 # All pids should be assigned. See issue #7805.
1256 self.assertNotIn(None, origworkerpids)
1257 self.assertNotIn(None, finalworkerpids)
1258 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001259 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1260 p.close()
1261 p.join()
1262
Benjamin Petersone711caf2008-06-11 16:44:04 +00001263#
1264# Test that manager has expected number of shared objects left
1265#
1266
1267class _TestZZZNumberOfObjects(BaseTestCase):
1268 # Because test cases are sorted alphabetically, this one will get
1269 # run after all the other tests for the manager. It tests that
1270 # there have been no "reference leaks" for the manager's shared
1271 # objects. Note the comment in _TestPool.test_terminate().
1272 ALLOWED_TYPES = ('manager',)
1273
1274 def test_number_of_objects(self):
1275 EXPECTED_NUMBER = 1 # the pool object is still alive
1276 multiprocessing.active_children() # discard dead process objs
1277 gc.collect() # do garbage collection
1278 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001279 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001280 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001281 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001282 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001283
1284 self.assertEqual(refs, EXPECTED_NUMBER)
1285
1286#
1287# Test of creating a customized manager class
1288#
1289
1290from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1291
1292class FooBar(object):
1293 def f(self):
1294 return 'f()'
1295 def g(self):
1296 raise ValueError
1297 def _h(self):
1298 return '_h()'
1299
1300def baz():
1301 for i in range(10):
1302 yield i*i
1303
1304class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001305 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001306 def __iter__(self):
1307 return self
1308 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001309 return self._callmethod('__next__')
1310
1311class MyManager(BaseManager):
1312 pass
1313
1314MyManager.register('Foo', callable=FooBar)
1315MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1316MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1317
1318
1319class _TestMyManager(BaseTestCase):
1320
1321 ALLOWED_TYPES = ('manager',)
1322
1323 def test_mymanager(self):
1324 manager = MyManager()
1325 manager.start()
1326
1327 foo = manager.Foo()
1328 bar = manager.Bar()
1329 baz = manager.baz()
1330
1331 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1332 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1333
1334 self.assertEqual(foo_methods, ['f', 'g'])
1335 self.assertEqual(bar_methods, ['f', '_h'])
1336
1337 self.assertEqual(foo.f(), 'f()')
1338 self.assertRaises(ValueError, foo.g)
1339 self.assertEqual(foo._callmethod('f'), 'f()')
1340 self.assertRaises(RemoteError, foo._callmethod, '_h')
1341
1342 self.assertEqual(bar.f(), 'f()')
1343 self.assertEqual(bar._h(), '_h()')
1344 self.assertEqual(bar._callmethod('f'), 'f()')
1345 self.assertEqual(bar._callmethod('_h'), '_h()')
1346
1347 self.assertEqual(list(baz), [i*i for i in range(10)])
1348
1349 manager.shutdown()
1350
1351#
1352# Test of connecting to a remote server and using xmlrpclib for serialization
1353#
1354
1355_queue = pyqueue.Queue()
1356def get_queue():
1357 return _queue
1358
1359class QueueManager(BaseManager):
1360 '''manager class used by server process'''
1361QueueManager.register('get_queue', callable=get_queue)
1362
1363class QueueManager2(BaseManager):
1364 '''manager class which specifies the same interface as QueueManager'''
1365QueueManager2.register('get_queue')
1366
1367
1368SERIALIZER = 'xmlrpclib'
1369
1370class _TestRemoteManager(BaseTestCase):
1371
1372 ALLOWED_TYPES = ('manager',)
1373
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001374 @classmethod
1375 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001376 manager = QueueManager2(
1377 address=address, authkey=authkey, serializer=SERIALIZER
1378 )
1379 manager.connect()
1380 queue = manager.get_queue()
1381 queue.put(('hello world', None, True, 2.25))
1382
1383 def test_remote(self):
1384 authkey = os.urandom(32)
1385
1386 manager = QueueManager(
1387 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1388 )
1389 manager.start()
1390
1391 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001392 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001393 p.start()
1394
1395 manager2 = QueueManager2(
1396 address=manager.address, authkey=authkey, serializer=SERIALIZER
1397 )
1398 manager2.connect()
1399 queue = manager2.get_queue()
1400
1401 # Note that xmlrpclib will deserialize object as a list not a tuple
1402 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1403
1404 # Because we are using xmlrpclib for serialization instead of
1405 # pickle this will cause a serialization error.
1406 self.assertRaises(Exception, queue.put, time.sleep)
1407
1408 # Make queue finalizer run before the server is stopped
1409 del queue
1410 manager.shutdown()
1411
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001412class _TestManagerRestart(BaseTestCase):
1413
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001414 @classmethod
1415 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001416 manager = QueueManager(
1417 address=address, authkey=authkey, serializer=SERIALIZER)
1418 manager.connect()
1419 queue = manager.get_queue()
1420 queue.put('hello world')
1421
1422 def test_rapid_restart(self):
1423 authkey = os.urandom(32)
1424 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001425 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001426 srvr = manager.get_server()
1427 addr = srvr.address
1428 # Close the connection.Listener socket which gets opened as a part
1429 # of manager.get_server(). It's not needed for the test.
1430 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001431 manager.start()
1432
1433 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001434 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001435 p.start()
1436 queue = manager.get_queue()
1437 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001438 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001439 manager.shutdown()
1440 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001441 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001442 try:
1443 manager.start()
1444 except IOError as e:
1445 if e.errno != errno.EADDRINUSE:
1446 raise
1447 # Retry after some time, in case the old socket was lingering
1448 # (sporadic failure on buildbots)
1449 time.sleep(1.0)
1450 manager = QueueManager(
1451 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001452 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001453
Benjamin Petersone711caf2008-06-11 16:44:04 +00001454#
1455#
1456#
1457
1458SENTINEL = latin('')
1459
1460class _TestConnection(BaseTestCase):
1461
1462 ALLOWED_TYPES = ('processes', 'threads')
1463
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001464 @classmethod
1465 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001466 for msg in iter(conn.recv_bytes, SENTINEL):
1467 conn.send_bytes(msg)
1468 conn.close()
1469
1470 def test_connection(self):
1471 conn, child_conn = self.Pipe()
1472
1473 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001474 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475 p.start()
1476
1477 seq = [1, 2.25, None]
1478 msg = latin('hello world')
1479 longmsg = msg * 10
1480 arr = array.array('i', list(range(4)))
1481
1482 if self.TYPE == 'processes':
1483 self.assertEqual(type(conn.fileno()), int)
1484
1485 self.assertEqual(conn.send(seq), None)
1486 self.assertEqual(conn.recv(), seq)
1487
1488 self.assertEqual(conn.send_bytes(msg), None)
1489 self.assertEqual(conn.recv_bytes(), msg)
1490
1491 if self.TYPE == 'processes':
1492 buffer = array.array('i', [0]*10)
1493 expected = list(arr) + [0] * (10 - len(arr))
1494 self.assertEqual(conn.send_bytes(arr), None)
1495 self.assertEqual(conn.recv_bytes_into(buffer),
1496 len(arr) * buffer.itemsize)
1497 self.assertEqual(list(buffer), expected)
1498
1499 buffer = array.array('i', [0]*10)
1500 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1501 self.assertEqual(conn.send_bytes(arr), None)
1502 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1503 len(arr) * buffer.itemsize)
1504 self.assertEqual(list(buffer), expected)
1505
1506 buffer = bytearray(latin(' ' * 40))
1507 self.assertEqual(conn.send_bytes(longmsg), None)
1508 try:
1509 res = conn.recv_bytes_into(buffer)
1510 except multiprocessing.BufferTooShort as e:
1511 self.assertEqual(e.args, (longmsg,))
1512 else:
1513 self.fail('expected BufferTooShort, got %s' % res)
1514
1515 poll = TimingWrapper(conn.poll)
1516
1517 self.assertEqual(poll(), False)
1518 self.assertTimingAlmostEqual(poll.elapsed, 0)
1519
1520 self.assertEqual(poll(TIMEOUT1), False)
1521 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1522
1523 conn.send(None)
1524
1525 self.assertEqual(poll(TIMEOUT1), True)
1526 self.assertTimingAlmostEqual(poll.elapsed, 0)
1527
1528 self.assertEqual(conn.recv(), None)
1529
1530 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1531 conn.send_bytes(really_big_msg)
1532 self.assertEqual(conn.recv_bytes(), really_big_msg)
1533
1534 conn.send_bytes(SENTINEL) # tell child to quit
1535 child_conn.close()
1536
1537 if self.TYPE == 'processes':
1538 self.assertEqual(conn.readable, True)
1539 self.assertEqual(conn.writable, True)
1540 self.assertRaises(EOFError, conn.recv)
1541 self.assertRaises(EOFError, conn.recv_bytes)
1542
1543 p.join()
1544
1545 def test_duplex_false(self):
1546 reader, writer = self.Pipe(duplex=False)
1547 self.assertEqual(writer.send(1), None)
1548 self.assertEqual(reader.recv(), 1)
1549 if self.TYPE == 'processes':
1550 self.assertEqual(reader.readable, True)
1551 self.assertEqual(reader.writable, False)
1552 self.assertEqual(writer.readable, False)
1553 self.assertEqual(writer.writable, True)
1554 self.assertRaises(IOError, reader.send, 2)
1555 self.assertRaises(IOError, writer.recv)
1556 self.assertRaises(IOError, writer.poll)
1557
1558 def test_spawn_close(self):
1559 # We test that a pipe connection can be closed by parent
1560 # process immediately after child is spawned. On Windows this
1561 # would have sometimes failed on old versions because
1562 # child_conn would be closed before the child got a chance to
1563 # duplicate it.
1564 conn, child_conn = self.Pipe()
1565
1566 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001567 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001568 p.start()
1569 child_conn.close() # this might complete before child initializes
1570
1571 msg = latin('hello')
1572 conn.send_bytes(msg)
1573 self.assertEqual(conn.recv_bytes(), msg)
1574
1575 conn.send_bytes(SENTINEL)
1576 conn.close()
1577 p.join()
1578
1579 def test_sendbytes(self):
1580 if self.TYPE != 'processes':
1581 return
1582
1583 msg = latin('abcdefghijklmnopqrstuvwxyz')
1584 a, b = self.Pipe()
1585
1586 a.send_bytes(msg)
1587 self.assertEqual(b.recv_bytes(), msg)
1588
1589 a.send_bytes(msg, 5)
1590 self.assertEqual(b.recv_bytes(), msg[5:])
1591
1592 a.send_bytes(msg, 7, 8)
1593 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1594
1595 a.send_bytes(msg, 26)
1596 self.assertEqual(b.recv_bytes(), latin(''))
1597
1598 a.send_bytes(msg, 26, 0)
1599 self.assertEqual(b.recv_bytes(), latin(''))
1600
1601 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1602
1603 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1604
1605 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1606
1607 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1608
1609 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1610
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001611 @classmethod
1612 def _is_fd_assigned(cls, fd):
1613 try:
1614 os.fstat(fd)
1615 except OSError as e:
1616 if e.errno == errno.EBADF:
1617 return False
1618 raise
1619 else:
1620 return True
1621
1622 @classmethod
1623 def _writefd(cls, conn, data, create_dummy_fds=False):
1624 if create_dummy_fds:
1625 for i in range(0, 256):
1626 if not cls._is_fd_assigned(i):
1627 os.dup2(conn.fileno(), i)
1628 fd = reduction.recv_handle(conn)
1629 if msvcrt:
1630 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1631 os.write(fd, data)
1632 os.close(fd)
1633
1634 def test_fd_transfer(self):
1635 if self.TYPE != 'processes':
1636 self.skipTest("only makes sense with processes")
1637 conn, child_conn = self.Pipe(duplex=True)
1638
1639 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001640 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001641 p.start()
1642 with open(test.support.TESTFN, "wb") as f:
1643 fd = f.fileno()
1644 if msvcrt:
1645 fd = msvcrt.get_osfhandle(fd)
1646 reduction.send_handle(conn, fd, p.pid)
1647 p.join()
1648 with open(test.support.TESTFN, "rb") as f:
1649 self.assertEqual(f.read(), b"foo")
1650
1651 @unittest.skipIf(sys.platform == "win32",
1652 "test semantics don't make sense on Windows")
1653 @unittest.skipIf(MAXFD <= 256,
1654 "largest assignable fd number is too small")
1655 @unittest.skipUnless(hasattr(os, "dup2"),
1656 "test needs os.dup2()")
1657 def test_large_fd_transfer(self):
1658 # With fd > 256 (issue #11657)
1659 if self.TYPE != 'processes':
1660 self.skipTest("only makes sense with processes")
1661 conn, child_conn = self.Pipe(duplex=True)
1662
1663 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001664 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001665 p.start()
1666 with open(test.support.TESTFN, "wb") as f:
1667 fd = f.fileno()
1668 for newfd in range(256, MAXFD):
1669 if not self._is_fd_assigned(newfd):
1670 break
1671 else:
1672 self.fail("could not find an unassigned large file descriptor")
1673 os.dup2(fd, newfd)
1674 try:
1675 reduction.send_handle(conn, newfd, p.pid)
1676 finally:
1677 os.close(newfd)
1678 p.join()
1679 with open(test.support.TESTFN, "rb") as f:
1680 self.assertEqual(f.read(), b"bar")
1681
1682
Benjamin Petersone711caf2008-06-11 16:44:04 +00001683class _TestListenerClient(BaseTestCase):
1684
1685 ALLOWED_TYPES = ('processes', 'threads')
1686
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001687 @classmethod
1688 def _test(cls, address):
1689 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001690 conn.send('hello')
1691 conn.close()
1692
1693 def test_listener_client(self):
1694 for family in self.connection.families:
1695 l = self.connection.Listener(family=family)
1696 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001697 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001698 p.start()
1699 conn = l.accept()
1700 self.assertEqual(conn.recv(), 'hello')
1701 p.join()
1702 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001703#
1704# Test of sending connection and socket objects between processes
1705#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001706"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001707class _TestPicklingConnections(BaseTestCase):
1708
1709 ALLOWED_TYPES = ('processes',)
1710
1711 def _listener(self, conn, families):
1712 for fam in families:
1713 l = self.connection.Listener(family=fam)
1714 conn.send(l.address)
1715 new_conn = l.accept()
1716 conn.send(new_conn)
1717
1718 if self.TYPE == 'processes':
1719 l = socket.socket()
1720 l.bind(('localhost', 0))
1721 conn.send(l.getsockname())
1722 l.listen(1)
1723 new_conn, addr = l.accept()
1724 conn.send(new_conn)
1725
1726 conn.recv()
1727
1728 def _remote(self, conn):
1729 for (address, msg) in iter(conn.recv, None):
1730 client = self.connection.Client(address)
1731 client.send(msg.upper())
1732 client.close()
1733
1734 if self.TYPE == 'processes':
1735 address, msg = conn.recv()
1736 client = socket.socket()
1737 client.connect(address)
1738 client.sendall(msg.upper())
1739 client.close()
1740
1741 conn.close()
1742
1743 def test_pickling(self):
1744 try:
1745 multiprocessing.allow_connection_pickling()
1746 except ImportError:
1747 return
1748
1749 families = self.connection.families
1750
1751 lconn, lconn0 = self.Pipe()
1752 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001753 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001754 lp.start()
1755 lconn0.close()
1756
1757 rconn, rconn0 = self.Pipe()
1758 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001759 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001760 rp.start()
1761 rconn0.close()
1762
1763 for fam in families:
1764 msg = ('This connection uses family %s' % fam).encode('ascii')
1765 address = lconn.recv()
1766 rconn.send((address, msg))
1767 new_conn = lconn.recv()
1768 self.assertEqual(new_conn.recv(), msg.upper())
1769
1770 rconn.send(None)
1771
1772 if self.TYPE == 'processes':
1773 msg = latin('This connection uses a normal socket')
1774 address = lconn.recv()
1775 rconn.send((address, msg))
1776 if hasattr(socket, 'fromfd'):
1777 new_conn = lconn.recv()
1778 self.assertEqual(new_conn.recv(100), msg.upper())
1779 else:
1780 # XXX On Windows with Py2.6 need to backport fromfd()
1781 discard = lconn.recv_bytes()
1782
1783 lconn.send(None)
1784
1785 rconn.close()
1786 lconn.close()
1787
1788 lp.join()
1789 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001790"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001791#
1792#
1793#
1794
1795class _TestHeap(BaseTestCase):
1796
1797 ALLOWED_TYPES = ('processes',)
1798
1799 def test_heap(self):
1800 iterations = 5000
1801 maxblocks = 50
1802 blocks = []
1803
1804 # create and destroy lots of blocks of different sizes
1805 for i in range(iterations):
1806 size = int(random.lognormvariate(0, 1) * 1000)
1807 b = multiprocessing.heap.BufferWrapper(size)
1808 blocks.append(b)
1809 if len(blocks) > maxblocks:
1810 i = random.randrange(maxblocks)
1811 del blocks[i]
1812
1813 # get the heap object
1814 heap = multiprocessing.heap.BufferWrapper._heap
1815
1816 # verify the state of the heap
1817 all = []
1818 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001819 heap._lock.acquire()
1820 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001821 for L in list(heap._len_to_seq.values()):
1822 for arena, start, stop in L:
1823 all.append((heap._arenas.index(arena), start, stop,
1824 stop-start, 'free'))
1825 for arena, start, stop in heap._allocated_blocks:
1826 all.append((heap._arenas.index(arena), start, stop,
1827 stop-start, 'occupied'))
1828 occupied += (stop-start)
1829
1830 all.sort()
1831
1832 for i in range(len(all)-1):
1833 (arena, start, stop) = all[i][:3]
1834 (narena, nstart, nstop) = all[i+1][:3]
1835 self.assertTrue((arena != narena and nstart == 0) or
1836 (stop == nstart))
1837
Charles-François Natali778db492011-07-02 14:35:49 +02001838 def test_free_from_gc(self):
1839 # Check that freeing of blocks by the garbage collector doesn't deadlock
1840 # (issue #12352).
1841 # Make sure the GC is enabled, and set lower collection thresholds to
1842 # make collections more frequent (and increase the probability of
1843 # deadlock).
1844 if not gc.isenabled():
1845 gc.enable()
1846 self.addCleanup(gc.disable)
1847 thresholds = gc.get_threshold()
1848 self.addCleanup(gc.set_threshold, *thresholds)
1849 gc.set_threshold(10)
1850
1851 # perform numerous block allocations, with cyclic references to make
1852 # sure objects are collected asynchronously by the gc
1853 for i in range(5000):
1854 a = multiprocessing.heap.BufferWrapper(1)
1855 b = multiprocessing.heap.BufferWrapper(1)
1856 # circular references
1857 a.buddy = b
1858 b.buddy = a
1859
Benjamin Petersone711caf2008-06-11 16:44:04 +00001860#
1861#
1862#
1863
Benjamin Petersone711caf2008-06-11 16:44:04 +00001864class _Foo(Structure):
1865 _fields_ = [
1866 ('x', c_int),
1867 ('y', c_double)
1868 ]
1869
1870class _TestSharedCTypes(BaseTestCase):
1871
1872 ALLOWED_TYPES = ('processes',)
1873
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001874 def setUp(self):
1875 if not HAS_SHAREDCTYPES:
1876 self.skipTest("requires multiprocessing.sharedctypes")
1877
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001878 @classmethod
1879 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001880 x.value *= 2
1881 y.value *= 2
1882 foo.x *= 2
1883 foo.y *= 2
1884 string.value *= 2
1885 for i in range(len(arr)):
1886 arr[i] *= 2
1887
1888 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001889 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001890 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001891 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001892 arr = self.Array('d', list(range(10)), lock=lock)
1893 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001894 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001895
1896 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001897 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001898 p.start()
1899 p.join()
1900
1901 self.assertEqual(x.value, 14)
1902 self.assertAlmostEqual(y.value, 2.0/3.0)
1903 self.assertEqual(foo.x, 6)
1904 self.assertAlmostEqual(foo.y, 4.0)
1905 for i in range(10):
1906 self.assertAlmostEqual(arr[i], i*2)
1907 self.assertEqual(string.value, latin('hellohello'))
1908
1909 def test_synchronize(self):
1910 self.test_sharedctypes(lock=True)
1911
1912 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001913 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001914 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001915 foo.x = 0
1916 foo.y = 0
1917 self.assertEqual(bar.x, 2)
1918 self.assertAlmostEqual(bar.y, 5.0)
1919
1920#
1921#
1922#
1923
1924class _TestFinalize(BaseTestCase):
1925
1926 ALLOWED_TYPES = ('processes',)
1927
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001928 @classmethod
1929 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001930 class Foo(object):
1931 pass
1932
1933 a = Foo()
1934 util.Finalize(a, conn.send, args=('a',))
1935 del a # triggers callback for a
1936
1937 b = Foo()
1938 close_b = util.Finalize(b, conn.send, args=('b',))
1939 close_b() # triggers callback for b
1940 close_b() # does nothing because callback has already been called
1941 del b # does nothing because callback has already been called
1942
1943 c = Foo()
1944 util.Finalize(c, conn.send, args=('c',))
1945
1946 d10 = Foo()
1947 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1948
1949 d01 = Foo()
1950 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1951 d02 = Foo()
1952 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1953 d03 = Foo()
1954 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1955
1956 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1957
1958 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1959
Ezio Melotti13925002011-03-16 11:05:33 +02001960 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001961 # garbage collecting locals
1962 util._exit_function()
1963 conn.close()
1964 os._exit(0)
1965
1966 def test_finalize(self):
1967 conn, child_conn = self.Pipe()
1968
1969 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001970 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001971 p.start()
1972 p.join()
1973
1974 result = [obj for obj in iter(conn.recv, 'STOP')]
1975 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1976
1977#
1978# Test that from ... import * works for each module
1979#
1980
1981class _TestImportStar(BaseTestCase):
1982
1983 ALLOWED_TYPES = ('processes',)
1984
1985 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001986 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001987 'multiprocessing', 'multiprocessing.connection',
1988 'multiprocessing.heap', 'multiprocessing.managers',
1989 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001990 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001992 ]
1993
1994 if c_int is not None:
1995 # This module requires _ctypes
1996 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001997
1998 for name in modules:
1999 __import__(name)
2000 mod = sys.modules[name]
2001
2002 for attr in getattr(mod, '__all__', ()):
2003 self.assertTrue(
2004 hasattr(mod, attr),
2005 '%r does not have attribute %r' % (mod, attr)
2006 )
2007
2008#
2009# Quick test that logging works -- does not test logging output
2010#
2011
2012class _TestLogging(BaseTestCase):
2013
2014 ALLOWED_TYPES = ('processes',)
2015
2016 def test_enable_logging(self):
2017 logger = multiprocessing.get_logger()
2018 logger.setLevel(util.SUBWARNING)
2019 self.assertTrue(logger is not None)
2020 logger.debug('this will not be printed')
2021 logger.info('nor will this')
2022 logger.setLevel(LOG_LEVEL)
2023
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002024 @classmethod
2025 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002026 logger = multiprocessing.get_logger()
2027 conn.send(logger.getEffectiveLevel())
2028
2029 def test_level(self):
2030 LEVEL1 = 32
2031 LEVEL2 = 37
2032
2033 logger = multiprocessing.get_logger()
2034 root_logger = logging.getLogger()
2035 root_level = root_logger.level
2036
2037 reader, writer = multiprocessing.Pipe(duplex=False)
2038
2039 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002040 p = self.Process(target=self._test_level, args=(writer,))
2041 p.daemon = True
2042 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002043 self.assertEqual(LEVEL1, reader.recv())
2044
2045 logger.setLevel(logging.NOTSET)
2046 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002047 p = self.Process(target=self._test_level, args=(writer,))
2048 p.daemon = True
2049 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002050 self.assertEqual(LEVEL2, reader.recv())
2051
2052 root_logger.setLevel(root_level)
2053 logger.setLevel(level=LOG_LEVEL)
2054
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002055
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002056# class _TestLoggingProcessName(BaseTestCase):
2057#
2058# def handle(self, record):
2059# assert record.processName == multiprocessing.current_process().name
2060# self.__handled = True
2061#
2062# def test_logging(self):
2063# handler = logging.Handler()
2064# handler.handle = self.handle
2065# self.__handled = False
2066# # Bypass getLogger() and side-effects
2067# logger = logging.getLoggerClass()(
2068# 'multiprocessing.test.TestLoggingProcessName')
2069# logger.addHandler(handler)
2070# logger.propagate = False
2071#
2072# logger.warn('foo')
2073# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002074
Benjamin Petersone711caf2008-06-11 16:44:04 +00002075#
Jesse Noller6214edd2009-01-19 16:23:53 +00002076# Test to verify handle verification, see issue 3321
2077#
2078
2079class TestInvalidHandle(unittest.TestCase):
2080
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002081 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002082 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002083 conn = multiprocessing.connection.Connection(44977608)
2084 try:
2085 self.assertRaises((ValueError, IOError), conn.poll)
2086 finally:
2087 # Hack private attribute _handle to avoid printing an error
2088 # in conn.__del__
2089 conn._handle = None
2090 self.assertRaises((ValueError, IOError),
2091 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002092
Jesse Noller6214edd2009-01-19 16:23:53 +00002093#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002094# Functions used to create test cases from the base ones in this module
2095#
2096
2097def get_attributes(Source, names):
2098 d = {}
2099 for name in names:
2100 obj = getattr(Source, name)
2101 if type(obj) == type(get_attributes):
2102 obj = staticmethod(obj)
2103 d[name] = obj
2104 return d
2105
2106def create_test_cases(Mixin, type):
2107 result = {}
2108 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002109 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002110
2111 for name in list(glob.keys()):
2112 if name.startswith('_Test'):
2113 base = glob[name]
2114 if type in base.ALLOWED_TYPES:
2115 newname = 'With' + Type + name[1:]
2116 class Temp(base, unittest.TestCase, Mixin):
2117 pass
2118 result[newname] = Temp
2119 Temp.__name__ = newname
2120 Temp.__module__ = Mixin.__module__
2121 return result
2122
2123#
2124# Create test cases
2125#
2126
2127class ProcessesMixin(object):
2128 TYPE = 'processes'
2129 Process = multiprocessing.Process
2130 locals().update(get_attributes(multiprocessing, (
2131 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2132 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2133 'RawArray', 'current_process', 'active_children', 'Pipe',
2134 'connection', 'JoinableQueue'
2135 )))
2136
2137testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2138globals().update(testcases_processes)
2139
2140
2141class ManagerMixin(object):
2142 TYPE = 'manager'
2143 Process = multiprocessing.Process
2144 manager = object.__new__(multiprocessing.managers.SyncManager)
2145 locals().update(get_attributes(manager, (
2146 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2147 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2148 'Namespace', 'JoinableQueue'
2149 )))
2150
2151testcases_manager = create_test_cases(ManagerMixin, type='manager')
2152globals().update(testcases_manager)
2153
2154
2155class ThreadsMixin(object):
2156 TYPE = 'threads'
2157 Process = multiprocessing.dummy.Process
2158 locals().update(get_attributes(multiprocessing.dummy, (
2159 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2160 'Condition', 'Event', 'Value', 'Array', 'current_process',
2161 'active_children', 'Pipe', 'connection', 'dict', 'list',
2162 'Namespace', 'JoinableQueue'
2163 )))
2164
2165testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2166globals().update(testcases_threads)
2167
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002168class OtherTest(unittest.TestCase):
2169 # TODO: add more tests for deliver/answer challenge.
2170 def test_deliver_challenge_auth_failure(self):
2171 class _FakeConnection(object):
2172 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002173 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002174 def send_bytes(self, data):
2175 pass
2176 self.assertRaises(multiprocessing.AuthenticationError,
2177 multiprocessing.connection.deliver_challenge,
2178 _FakeConnection(), b'abc')
2179
2180 def test_answer_challenge_auth_failure(self):
2181 class _FakeConnection(object):
2182 def __init__(self):
2183 self.count = 0
2184 def recv_bytes(self, size):
2185 self.count += 1
2186 if self.count == 1:
2187 return multiprocessing.connection.CHALLENGE
2188 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002189 return b'something bogus'
2190 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002191 def send_bytes(self, data):
2192 pass
2193 self.assertRaises(multiprocessing.AuthenticationError,
2194 multiprocessing.connection.answer_challenge,
2195 _FakeConnection(), b'abc')
2196
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002197#
2198# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2199#
2200
2201def initializer(ns):
2202 ns.test += 1
2203
2204class TestInitializers(unittest.TestCase):
2205 def setUp(self):
2206 self.mgr = multiprocessing.Manager()
2207 self.ns = self.mgr.Namespace()
2208 self.ns.test = 0
2209
2210 def tearDown(self):
2211 self.mgr.shutdown()
2212
2213 def test_manager_initializer(self):
2214 m = multiprocessing.managers.SyncManager()
2215 self.assertRaises(TypeError, m.start, 1)
2216 m.start(initializer, (self.ns,))
2217 self.assertEqual(self.ns.test, 1)
2218 m.shutdown()
2219
2220 def test_pool_initializer(self):
2221 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2222 p = multiprocessing.Pool(1, initializer, (self.ns,))
2223 p.close()
2224 p.join()
2225 self.assertEqual(self.ns.test, 1)
2226
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002227#
2228# Issue 5155, 5313, 5331: Test process in processes
2229# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2230#
2231
2232def _ThisSubProcess(q):
2233 try:
2234 item = q.get(block=False)
2235 except pyqueue.Empty:
2236 pass
2237
2238def _TestProcess(q):
2239 queue = multiprocessing.Queue()
2240 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002241 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002242 subProc.start()
2243 subProc.join()
2244
2245def _afunc(x):
2246 return x*x
2247
2248def pool_in_process():
2249 pool = multiprocessing.Pool(processes=4)
2250 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2251
2252class _file_like(object):
2253 def __init__(self, delegate):
2254 self._delegate = delegate
2255 self._pid = None
2256
2257 @property
2258 def cache(self):
2259 pid = os.getpid()
2260 # There are no race conditions since fork keeps only the running thread
2261 if pid != self._pid:
2262 self._pid = pid
2263 self._cache = []
2264 return self._cache
2265
2266 def write(self, data):
2267 self.cache.append(data)
2268
2269 def flush(self):
2270 self._delegate.write(''.join(self.cache))
2271 self._cache = []
2272
2273class TestStdinBadfiledescriptor(unittest.TestCase):
2274
2275 def test_queue_in_process(self):
2276 queue = multiprocessing.Queue()
2277 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2278 proc.start()
2279 proc.join()
2280
2281 def test_pool_in_process(self):
2282 p = multiprocessing.Process(target=pool_in_process)
2283 p.start()
2284 p.join()
2285
2286 def test_flushing(self):
2287 sio = io.StringIO()
2288 flike = _file_like(sio)
2289 flike.write('foo')
2290 proc = multiprocessing.Process(target=lambda: flike.flush())
2291 flike.flush()
2292 assert sio.getvalue() == 'foo'
2293
2294testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2295 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002296
Benjamin Petersone711caf2008-06-11 16:44:04 +00002297#
2298#
2299#
2300
2301def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002302 if sys.platform.startswith("linux"):
2303 try:
2304 lock = multiprocessing.RLock()
2305 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002306 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002307
Benjamin Petersone711caf2008-06-11 16:44:04 +00002308 if run is None:
2309 from test.support import run_unittest as run
2310
2311 util.get_temp_dir() # creates temp directory for use by all processes
2312
2313 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2314
Benjamin Peterson41181742008-07-02 20:22:54 +00002315 ProcessesMixin.pool = multiprocessing.Pool(4)
2316 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2317 ManagerMixin.manager.__init__()
2318 ManagerMixin.manager.start()
2319 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002320
2321 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002322 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2323 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002324 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2325 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002326 )
2327
2328 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2329 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2330 run(suite)
2331
Benjamin Peterson41181742008-07-02 20:22:54 +00002332 ThreadsMixin.pool.terminate()
2333 ProcessesMixin.pool.terminate()
2334 ManagerMixin.pool.terminate()
2335 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002336
Benjamin Peterson41181742008-07-02 20:22:54 +00002337 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002338
2339def main():
2340 test_main(unittest.TextTestRunner(verbosity=2).run)
2341
2342if __name__ == '__main__':
2343 main()