blob: bb0f06adf6abf3a2fbedf4d9a0d4874f21c3fa50 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000019import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
R. David Murraya21e4ca2009-03-31 23:16:50 +000022# Skip tests if _multiprocessing wasn't built.
23_multiprocessing = test.support.import_module('_multiprocessing')
24# Skip tests if sem_open implementation is broken.
25test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000026# import threading after _multiprocessing to raise a more revelant error
27# message: "No module named _multiprocessing". _multiprocessing is not compiled
28# without thread support.
29import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
Benjamin Petersone711caf2008-06-11 16:44:04 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37from multiprocessing import util
38
Brian Curtinafa88b52010-10-07 01:12:19 +000039try:
40 from multiprocessing.sharedctypes import Value, copy
41 HAS_SHAREDCTYPES = True
42except ImportError:
43 HAS_SHAREDCTYPES = False
44
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46#
47#
48
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000049def latin(s):
50 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Benjamin Petersone711caf2008-06-11 16:44:04 +000052#
53# Constants
54#
55
56LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000057#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59DELTA = 0.1
60CHECK_TIMINGS = False # making true makes tests take a lot longer
61 # and can sometimes cause some non-serious
62 # failures because some calls block a bit
63 # longer than expected
64if CHECK_TIMINGS:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
66else:
67 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
68
69HAVE_GETVALUE = not getattr(_multiprocessing,
70 'HAVE_BROKEN_SEM_GETVALUE', False)
71
Jesse Noller6214edd2009-01-19 16:23:53 +000072WIN32 = (sys.platform == "win32")
73
Benjamin Petersone711caf2008-06-11 16:44:04 +000074#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000075# Some tests require ctypes
76#
77
78try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000079 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000080except ImportError:
81 Structure = object
82 c_int = c_double = None
83
84#
Benjamin Petersone711caf2008-06-11 16:44:04 +000085# Creates a wrapper for a function which records the time it takes to finish
86#
87
88class TimingWrapper(object):
89
90 def __init__(self, func):
91 self.func = func
92 self.elapsed = None
93
94 def __call__(self, *args, **kwds):
95 t = time.time()
96 try:
97 return self.func(*args, **kwds)
98 finally:
99 self.elapsed = time.time() - t
100
101#
102# Base class for test cases
103#
104
105class BaseTestCase(object):
106
107 ALLOWED_TYPES = ('processes', 'manager', 'threads')
108
109 def assertTimingAlmostEqual(self, a, b):
110 if CHECK_TIMINGS:
111 self.assertAlmostEqual(a, b, 1)
112
113 def assertReturnsIfImplemented(self, value, func, *args):
114 try:
115 res = func(*args)
116 except NotImplementedError:
117 pass
118 else:
119 return self.assertEqual(value, res)
120
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000121 # For the sanity of Windows users, rather than crashing or freezing in
122 # multiple ways.
123 def __reduce__(self, *args):
124 raise NotImplementedError("shouldn't try to pickle a test case")
125
126 __reduce_ex__ = __reduce__
127
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128#
129# Return the value of a semaphore
130#
131
132def get_value(self):
133 try:
134 return self.get_value()
135 except AttributeError:
136 try:
137 return self._Semaphore__value
138 except AttributeError:
139 try:
140 return self._value
141 except AttributeError:
142 raise NotImplementedError
143
144#
145# Testcases
146#
147
148class _TestProcess(BaseTestCase):
149
150 ALLOWED_TYPES = ('processes', 'threads')
151
152 def test_current(self):
153 if self.TYPE == 'threads':
154 return
155
156 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158
159 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000161 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 self.assertEqual(current.ident, os.getpid())
164 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000166 @classmethod
167 def _test(cls, q, *args, **kwds):
168 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 q.put(args)
170 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000173 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 q.put(current.pid)
175
176 def test_process(self):
177 q = self.Queue(1)
178 e = self.Event()
179 args = (q, 1, 2)
180 kwargs = {'hello':23, 'bye':2.54}
181 name = 'SomeProcess'
182 p = self.Process(
183 target=self._test, args=args, kwargs=kwargs, name=name
184 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 current = self.current_process()
187
188 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000189 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEquals(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195
196 p.start()
197
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 self.assertEquals(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000200 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201
202 self.assertEquals(q.get(), args[1:])
203 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 self.assertEquals(q.get(), p.pid)
208
209 p.join()
210
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212 self.assertEquals(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000213 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000215 @classmethod
216 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217 time.sleep(1000)
218
219 def test_terminate(self):
220 if self.TYPE == 'threads':
221 return
222
223 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 p.start()
226
227 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000228 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230
231 p.terminate()
232
233 join = TimingWrapper(p.join)
234 self.assertEqual(join(), None)
235 self.assertTimingAlmostEqual(join.elapsed, 0.0)
236
237 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000238 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 p.join()
241
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 # XXX sometimes get p.exitcode == 0 on Windows ...
243 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 def test_cpu_count(self):
246 try:
247 cpus = multiprocessing.cpu_count()
248 except NotImplementedError:
249 cpus = 1
250 self.assertTrue(type(cpus) is int)
251 self.assertTrue(cpus >= 1)
252
253 def test_active_children(self):
254 self.assertEqual(type(self.active_children()), list)
255
256 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000257 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000265 @classmethod
266 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267 from multiprocessing import forking
268 wconn.send(id)
269 if len(id) < 2:
270 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 p = cls.Process(
272 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 )
274 p.start()
275 p.join()
276
277 def test_recursion(self):
278 rconn, wconn = self.Pipe(duplex=False)
279 self._test_recursion(wconn, [])
280
281 time.sleep(DELTA)
282 result = []
283 while rconn.poll():
284 result.append(rconn.recv())
285
286 expected = [
287 [],
288 [0],
289 [0, 0],
290 [0, 1],
291 [1],
292 [1, 0],
293 [1, 1]
294 ]
295 self.assertEqual(result, expected)
296
297#
298#
299#
300
301class _UpperCaser(multiprocessing.Process):
302
303 def __init__(self):
304 multiprocessing.Process.__init__(self)
305 self.child_conn, self.parent_conn = multiprocessing.Pipe()
306
307 def run(self):
308 self.parent_conn.close()
309 for s in iter(self.child_conn.recv, None):
310 self.child_conn.send(s.upper())
311 self.child_conn.close()
312
313 def submit(self, s):
314 assert type(s) is str
315 self.parent_conn.send(s)
316 return self.parent_conn.recv()
317
318 def stop(self):
319 self.parent_conn.send(None)
320 self.parent_conn.close()
321 self.child_conn.close()
322
323class _TestSubclassingProcess(BaseTestCase):
324
325 ALLOWED_TYPES = ('processes',)
326
327 def test_subclassing(self):
328 uppercaser = _UpperCaser()
329 uppercaser.start()
330 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
331 self.assertEqual(uppercaser.submit('world'), 'WORLD')
332 uppercaser.stop()
333 uppercaser.join()
334
335#
336#
337#
338
339def queue_empty(q):
340 if hasattr(q, 'empty'):
341 return q.empty()
342 else:
343 return q.qsize() == 0
344
345def queue_full(q, maxsize):
346 if hasattr(q, 'full'):
347 return q.full()
348 else:
349 return q.qsize() == maxsize
350
351
352class _TestQueue(BaseTestCase):
353
354
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000355 @classmethod
356 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000357 child_can_start.wait()
358 for i in range(6):
359 queue.get()
360 parent_can_continue.set()
361
362 def test_put(self):
363 MAXSIZE = 6
364 queue = self.Queue(maxsize=MAXSIZE)
365 child_can_start = self.Event()
366 parent_can_continue = self.Event()
367
368 proc = self.Process(
369 target=self._test_put,
370 args=(queue, child_can_start, parent_can_continue)
371 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000372 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 proc.start()
374
375 self.assertEqual(queue_empty(queue), True)
376 self.assertEqual(queue_full(queue, MAXSIZE), False)
377
378 queue.put(1)
379 queue.put(2, True)
380 queue.put(3, True, None)
381 queue.put(4, False)
382 queue.put(5, False, None)
383 queue.put_nowait(6)
384
385 # the values may be in buffer but not yet in pipe so sleep a bit
386 time.sleep(DELTA)
387
388 self.assertEqual(queue_empty(queue), False)
389 self.assertEqual(queue_full(queue, MAXSIZE), True)
390
391 put = TimingWrapper(queue.put)
392 put_nowait = TimingWrapper(queue.put_nowait)
393
394 self.assertRaises(pyqueue.Full, put, 7, False)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(pyqueue.Full, put, 7, False, None)
398 self.assertTimingAlmostEqual(put.elapsed, 0)
399
400 self.assertRaises(pyqueue.Full, put_nowait, 7)
401 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
402
403 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
404 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
405
406 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
407 self.assertTimingAlmostEqual(put.elapsed, 0)
408
409 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
410 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 self.assertEqual(queue_empty(queue), True)
416 self.assertEqual(queue_full(queue, MAXSIZE), False)
417
418 proc.join()
419
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000420 @classmethod
421 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000423 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000424 queue.put(2)
425 queue.put(3)
426 queue.put(4)
427 queue.put(5)
428 parent_can_continue.set()
429
430 def test_get(self):
431 queue = self.Queue()
432 child_can_start = self.Event()
433 parent_can_continue = self.Event()
434
435 proc = self.Process(
436 target=self._test_get,
437 args=(queue, child_can_start, parent_can_continue)
438 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000439 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440 proc.start()
441
442 self.assertEqual(queue_empty(queue), True)
443
444 child_can_start.set()
445 parent_can_continue.wait()
446
447 time.sleep(DELTA)
448 self.assertEqual(queue_empty(queue), False)
449
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000450 # Hangs unexpectedly, remove for now
451 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000452 self.assertEqual(queue.get(True, None), 2)
453 self.assertEqual(queue.get(True), 3)
454 self.assertEqual(queue.get(timeout=1), 4)
455 self.assertEqual(queue.get_nowait(), 5)
456
457 self.assertEqual(queue_empty(queue), True)
458
459 get = TimingWrapper(queue.get)
460 get_nowait = TimingWrapper(queue.get_nowait)
461
462 self.assertRaises(pyqueue.Empty, get, False)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get, False, None)
466 self.assertTimingAlmostEqual(get.elapsed, 0)
467
468 self.assertRaises(pyqueue.Empty, get_nowait)
469 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
470
471 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
472 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
473
474 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
475 self.assertTimingAlmostEqual(get.elapsed, 0)
476
477 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
478 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
479
480 proc.join()
481
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000482 @classmethod
483 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000484 for i in range(10, 20):
485 queue.put(i)
486 # note that at this point the items may only be buffered, so the
487 # process cannot shutdown until the feeder thread has finished
488 # pushing items onto the pipe.
489
490 def test_fork(self):
491 # Old versions of Queue would fail to create a new feeder
492 # thread for a forked process if the original process had its
493 # own feeder thread. This test checks that this no longer
494 # happens.
495
496 queue = self.Queue()
497
498 # put items on queue so that main process starts a feeder thread
499 for i in range(10):
500 queue.put(i)
501
502 # wait to make sure thread starts before we fork a new process
503 time.sleep(DELTA)
504
505 # fork process
506 p = self.Process(target=self._test_fork, args=(queue,))
507 p.start()
508
509 # check that all expected items are in the queue
510 for i in range(20):
511 self.assertEqual(queue.get(), i)
512 self.assertRaises(pyqueue.Empty, queue.get, False)
513
514 p.join()
515
516 def test_qsize(self):
517 q = self.Queue()
518 try:
519 self.assertEqual(q.qsize(), 0)
520 except NotImplementedError:
521 return
522 q.put(1)
523 self.assertEqual(q.qsize(), 1)
524 q.put(5)
525 self.assertEqual(q.qsize(), 2)
526 q.get()
527 self.assertEqual(q.qsize(), 1)
528 q.get()
529 self.assertEqual(q.qsize(), 0)
530
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000531 @classmethod
532 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000533 for obj in iter(q.get, None):
534 time.sleep(DELTA)
535 q.task_done()
536
537 def test_task_done(self):
538 queue = self.JoinableQueue()
539
540 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000541 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542
543 workers = [self.Process(target=self._test_task_done, args=(queue,))
544 for i in range(4)]
545
546 for p in workers:
547 p.start()
548
549 for i in range(10):
550 queue.put(i)
551
552 queue.join()
553
554 for p in workers:
555 queue.put(None)
556
557 for p in workers:
558 p.join()
559
560#
561#
562#
563
564class _TestLock(BaseTestCase):
565
566 def test_lock(self):
567 lock = self.Lock()
568 self.assertEqual(lock.acquire(), True)
569 self.assertEqual(lock.acquire(False), False)
570 self.assertEqual(lock.release(), None)
571 self.assertRaises((ValueError, threading.ThreadError), lock.release)
572
573 def test_rlock(self):
574 lock = self.RLock()
575 self.assertEqual(lock.acquire(), True)
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.release(), None)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertRaises((AssertionError, RuntimeError), lock.release)
582
Jesse Nollerf8d00852009-03-31 03:25:07 +0000583 def test_lock_context(self):
584 with self.Lock():
585 pass
586
Benjamin Petersone711caf2008-06-11 16:44:04 +0000587
588class _TestSemaphore(BaseTestCase):
589
590 def _test_semaphore(self, sem):
591 self.assertReturnsIfImplemented(2, get_value, sem)
592 self.assertEqual(sem.acquire(), True)
593 self.assertReturnsIfImplemented(1, get_value, sem)
594 self.assertEqual(sem.acquire(), True)
595 self.assertReturnsIfImplemented(0, get_value, sem)
596 self.assertEqual(sem.acquire(False), False)
597 self.assertReturnsIfImplemented(0, get_value, sem)
598 self.assertEqual(sem.release(), None)
599 self.assertReturnsIfImplemented(1, get_value, sem)
600 self.assertEqual(sem.release(), None)
601 self.assertReturnsIfImplemented(2, get_value, sem)
602
603 def test_semaphore(self):
604 sem = self.Semaphore(2)
605 self._test_semaphore(sem)
606 self.assertEqual(sem.release(), None)
607 self.assertReturnsIfImplemented(3, get_value, sem)
608 self.assertEqual(sem.release(), None)
609 self.assertReturnsIfImplemented(4, get_value, sem)
610
611 def test_bounded_semaphore(self):
612 sem = self.BoundedSemaphore(2)
613 self._test_semaphore(sem)
614 # Currently fails on OS/X
615 #if HAVE_GETVALUE:
616 # self.assertRaises(ValueError, sem.release)
617 # self.assertReturnsIfImplemented(2, get_value, sem)
618
619 def test_timeout(self):
620 if self.TYPE != 'processes':
621 return
622
623 sem = self.Semaphore(0)
624 acquire = TimingWrapper(sem.acquire)
625
626 self.assertEqual(acquire(False), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, None), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
631
632 self.assertEqual(acquire(False, TIMEOUT1), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, 0)
634
635 self.assertEqual(acquire(True, TIMEOUT2), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
637
638 self.assertEqual(acquire(timeout=TIMEOUT3), False)
639 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
640
641
642class _TestCondition(BaseTestCase):
643
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000644 @classmethod
645 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000646 cond.acquire()
647 sleeping.release()
648 cond.wait(timeout)
649 woken.release()
650 cond.release()
651
652 def check_invariant(self, cond):
653 # this is only supposed to succeed when there are no sleepers
654 if self.TYPE == 'processes':
655 try:
656 sleepers = (cond._sleeping_count.get_value() -
657 cond._woken_count.get_value())
658 self.assertEqual(sleepers, 0)
659 self.assertEqual(cond._wait_semaphore.get_value(), 0)
660 except NotImplementedError:
661 pass
662
663 def test_notify(self):
664 cond = self.Condition()
665 sleeping = self.Semaphore(0)
666 woken = self.Semaphore(0)
667
668 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000669 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000670 p.start()
671
672 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000673 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 p.start()
675
676 # wait for both children to start sleeping
677 sleeping.acquire()
678 sleeping.acquire()
679
680 # check no process/thread has woken up
681 time.sleep(DELTA)
682 self.assertReturnsIfImplemented(0, get_value, woken)
683
684 # wake up one process/thread
685 cond.acquire()
686 cond.notify()
687 cond.release()
688
689 # check one process/thread has woken up
690 time.sleep(DELTA)
691 self.assertReturnsIfImplemented(1, get_value, woken)
692
693 # wake up another
694 cond.acquire()
695 cond.notify()
696 cond.release()
697
698 # check other has woken up
699 time.sleep(DELTA)
700 self.assertReturnsIfImplemented(2, get_value, woken)
701
702 # check state is not mucked up
703 self.check_invariant(cond)
704 p.join()
705
706 def test_notify_all(self):
707 cond = self.Condition()
708 sleeping = self.Semaphore(0)
709 woken = self.Semaphore(0)
710
711 # start some threads/processes which will timeout
712 for i in range(3):
713 p = self.Process(target=self.f,
714 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000715 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 p.start()
717
718 t = threading.Thread(target=self.f,
719 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000720 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 t.start()
722
723 # wait for them all to sleep
724 for i in range(6):
725 sleeping.acquire()
726
727 # check they have all timed out
728 for i in range(6):
729 woken.acquire()
730 self.assertReturnsIfImplemented(0, get_value, woken)
731
732 # check state is not mucked up
733 self.check_invariant(cond)
734
735 # start some more threads/processes
736 for i in range(3):
737 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000738 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000739 p.start()
740
741 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000742 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000743 t.start()
744
745 # wait for them to all sleep
746 for i in range(6):
747 sleeping.acquire()
748
749 # check no process/thread has woken up
750 time.sleep(DELTA)
751 self.assertReturnsIfImplemented(0, get_value, woken)
752
753 # wake them all up
754 cond.acquire()
755 cond.notify_all()
756 cond.release()
757
758 # check they have all woken
759 time.sleep(DELTA)
760 self.assertReturnsIfImplemented(6, get_value, woken)
761
762 # check state is not mucked up
763 self.check_invariant(cond)
764
765 def test_timeout(self):
766 cond = self.Condition()
767 wait = TimingWrapper(cond.wait)
768 cond.acquire()
769 res = wait(TIMEOUT1)
770 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000771 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000772 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
773
774
775class _TestEvent(BaseTestCase):
776
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000777 @classmethod
778 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 time.sleep(TIMEOUT2)
780 event.set()
781
782 def test_event(self):
783 event = self.Event()
784 wait = TimingWrapper(event.wait)
785
786 # Removed temporaily, due to API shear, this does not
787 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789
Benjamin Peterson965ce872009-04-05 21:24:58 +0000790 # Removed, threading.Event.wait() will return the value of the __flag
791 # instead of None. API Shear with the semaphore backed mp.Event
792 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000794 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000795 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
796
797 event.set()
798
799 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(event.is_set(), True)
801 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000803 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
805 # self.assertEqual(event.is_set(), True)
806
807 event.clear()
808
809 #self.assertEqual(event.is_set(), False)
810
811 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000812 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813
814#
815#
816#
817
Brian Curtinafa88b52010-10-07 01:12:19 +0000818@unittest.skipUnless(HAS_SHAREDCTYPES,
819 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820class _TestValue(BaseTestCase):
821
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000822 ALLOWED_TYPES = ('processes',)
823
Benjamin Petersone711caf2008-06-11 16:44:04 +0000824 codes_values = [
825 ('i', 4343, 24234),
826 ('d', 3.625, -4.25),
827 ('h', -232, 234),
828 ('c', latin('x'), latin('y'))
829 ]
830
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000831 @classmethod
832 def _test(cls, values):
833 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 sv.value = cv[2]
835
836
837 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 if raw:
839 values = [self.RawValue(code, value)
840 for code, value, _ in self.codes_values]
841 else:
842 values = [self.Value(code, value)
843 for code, value, _ in self.codes_values]
844
845 for sv, cv in zip(values, self.codes_values):
846 self.assertEqual(sv.value, cv[1])
847
848 proc = self.Process(target=self._test, args=(values,))
849 proc.start()
850 proc.join()
851
852 for sv, cv in zip(values, self.codes_values):
853 self.assertEqual(sv.value, cv[2])
854
855 def test_rawvalue(self):
856 self.test_value(raw=True)
857
858 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 val1 = self.Value('i', 5)
860 lock1 = val1.get_lock()
861 obj1 = val1.get_obj()
862
863 val2 = self.Value('i', 5, lock=None)
864 lock2 = val2.get_lock()
865 obj2 = val2.get_obj()
866
867 lock = self.Lock()
868 val3 = self.Value('i', 5, lock=lock)
869 lock3 = val3.get_lock()
870 obj3 = val3.get_obj()
871 self.assertEqual(lock, lock3)
872
Jesse Nollerb0516a62009-01-18 03:11:38 +0000873 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 self.assertFalse(hasattr(arr4, 'get_lock'))
875 self.assertFalse(hasattr(arr4, 'get_obj'))
876
Jesse Nollerb0516a62009-01-18 03:11:38 +0000877 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
878
879 arr5 = self.RawValue('i', 5)
880 self.assertFalse(hasattr(arr5, 'get_lock'))
881 self.assertFalse(hasattr(arr5, 'get_obj'))
882
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883
884class _TestArray(BaseTestCase):
885
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000886 ALLOWED_TYPES = ('processes',)
887
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000888 @classmethod
889 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 for i in range(1, len(seq)):
891 seq[i] += seq[i-1]
892
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000893 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
896 if raw:
897 arr = self.RawArray('i', seq)
898 else:
899 arr = self.Array('i', seq)
900
901 self.assertEqual(len(arr), len(seq))
902 self.assertEqual(arr[3], seq[3])
903 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
904
905 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
906
907 self.assertEqual(list(arr[:]), seq)
908
909 self.f(seq)
910
911 p = self.Process(target=self.f, args=(arr,))
912 p.start()
913 p.join()
914
915 self.assertEqual(list(arr[:]), seq)
916
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000917 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 def test_rawarray(self):
919 self.test_array(raw=True)
920
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000921 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000922 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000923 arr1 = self.Array('i', list(range(10)))
924 lock1 = arr1.get_lock()
925 obj1 = arr1.get_obj()
926
927 arr2 = self.Array('i', list(range(10)), lock=None)
928 lock2 = arr2.get_lock()
929 obj2 = arr2.get_obj()
930
931 lock = self.Lock()
932 arr3 = self.Array('i', list(range(10)), lock=lock)
933 lock3 = arr3.get_lock()
934 obj3 = arr3.get_obj()
935 self.assertEqual(lock, lock3)
936
Jesse Nollerb0516a62009-01-18 03:11:38 +0000937 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 self.assertFalse(hasattr(arr4, 'get_lock'))
939 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000940 self.assertRaises(AttributeError,
941 self.Array, 'i', range(10), lock='notalock')
942
943 arr5 = self.RawArray('i', range(10))
944 self.assertFalse(hasattr(arr5, 'get_lock'))
945 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946
947#
948#
949#
950
951class _TestContainers(BaseTestCase):
952
953 ALLOWED_TYPES = ('manager',)
954
955 def test_list(self):
956 a = self.list(list(range(10)))
957 self.assertEqual(a[:], list(range(10)))
958
959 b = self.list()
960 self.assertEqual(b[:], [])
961
962 b.extend(list(range(5)))
963 self.assertEqual(b[:], list(range(5)))
964
965 self.assertEqual(b[2], 2)
966 self.assertEqual(b[2:10], [2,3,4])
967
968 b *= 2
969 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
970
971 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
972
973 self.assertEqual(a[:], list(range(10)))
974
975 d = [a, b]
976 e = self.list(d)
977 self.assertEqual(
978 e[:],
979 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
980 )
981
982 f = self.list([a])
983 a.append('hello')
984 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
985
986 def test_dict(self):
987 d = self.dict()
988 indices = list(range(65, 70))
989 for i in indices:
990 d[i] = chr(i)
991 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
992 self.assertEqual(sorted(d.keys()), indices)
993 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
994 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
995
996 def test_namespace(self):
997 n = self.Namespace()
998 n.name = 'Bob'
999 n.job = 'Builder'
1000 n._hidden = 'hidden'
1001 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1002 del n.job
1003 self.assertEqual(str(n), "Namespace(name='Bob')")
1004 self.assertTrue(hasattr(n, 'name'))
1005 self.assertTrue(not hasattr(n, 'job'))
1006
1007#
1008#
1009#
1010
1011def sqr(x, wait=0.0):
1012 time.sleep(wait)
1013 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001014
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015class _TestPool(BaseTestCase):
1016
1017 def test_apply(self):
1018 papply = self.pool.apply
1019 self.assertEqual(papply(sqr, (5,)), sqr(5))
1020 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1021
1022 def test_map(self):
1023 pmap = self.pool.map
1024 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1025 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1026 list(map(sqr, list(range(100)))))
1027
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001028 def test_map_chunksize(self):
1029 try:
1030 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1031 except multiprocessing.TimeoutError:
1032 self.fail("pool.map_async with chunksize stalled on null list")
1033
Benjamin Petersone711caf2008-06-11 16:44:04 +00001034 def test_async(self):
1035 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1036 get = TimingWrapper(res.get)
1037 self.assertEqual(get(), 49)
1038 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1039
1040 def test_async_timeout(self):
1041 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1042 get = TimingWrapper(res.get)
1043 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1044 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1045
1046 def test_imap(self):
1047 it = self.pool.imap(sqr, list(range(10)))
1048 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1049
1050 it = self.pool.imap(sqr, list(range(10)))
1051 for i in range(10):
1052 self.assertEqual(next(it), i*i)
1053 self.assertRaises(StopIteration, it.__next__)
1054
1055 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1056 for i in range(1000):
1057 self.assertEqual(next(it), i*i)
1058 self.assertRaises(StopIteration, it.__next__)
1059
1060 def test_imap_unordered(self):
1061 it = self.pool.imap_unordered(sqr, list(range(1000)))
1062 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1063
1064 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1065 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1066
1067 def test_make_pool(self):
1068 p = multiprocessing.Pool(3)
1069 self.assertEqual(3, len(p._pool))
1070 p.close()
1071 p.join()
1072
1073 def test_terminate(self):
1074 if self.TYPE == 'manager':
1075 # On Unix a forked process increfs each shared object to
1076 # which its parent process held a reference. If the
1077 # forked process gets terminated then there is likely to
1078 # be a reference leak. So to prevent
1079 # _TestZZZNumberOfObjects from failing we skip this test
1080 # when using a manager.
1081 return
1082
1083 result = self.pool.map_async(
1084 time.sleep, [0.1 for i in range(10000)], chunksize=1
1085 )
1086 self.pool.terminate()
1087 join = TimingWrapper(self.pool.join)
1088 join()
1089 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001090
Ask Solem2afcbf22010-11-09 20:55:52 +00001091def raising():
1092 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001093
Ask Solem2afcbf22010-11-09 20:55:52 +00001094def unpickleable_result():
1095 return lambda: 42
1096
1097class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001098 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001099
1100 def test_async_error_callback(self):
1101 p = multiprocessing.Pool(2)
1102
1103 scratchpad = [None]
1104 def errback(exc):
1105 scratchpad[0] = exc
1106
1107 res = p.apply_async(raising, error_callback=errback)
1108 self.assertRaises(KeyError, res.get)
1109 self.assertTrue(scratchpad[0])
1110 self.assertIsInstance(scratchpad[0], KeyError)
1111
1112 p.close()
1113 p.join()
1114
1115 def test_unpickleable_result(self):
1116 from multiprocessing.pool import MaybeEncodingError
1117 p = multiprocessing.Pool(2)
1118
1119 # Make sure we don't lose pool processes because of encoding errors.
1120 for iteration in range(20):
1121
1122 scratchpad = [None]
1123 def errback(exc):
1124 scratchpad[0] = exc
1125
1126 res = p.apply_async(unpickleable_result, error_callback=errback)
1127 self.assertRaises(MaybeEncodingError, res.get)
1128 wrapped = scratchpad[0]
1129 self.assertTrue(wrapped)
1130 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1131 self.assertIsNotNone(wrapped.exc)
1132 self.assertIsNotNone(wrapped.value)
1133
1134 p.close()
1135 p.join()
1136
1137class _TestPoolWorkerLifetime(BaseTestCase):
1138 ALLOWED_TYPES = ('processes', )
1139
Jesse Noller1f0b6582010-01-27 03:36:01 +00001140 def test_pool_worker_lifetime(self):
1141 p = multiprocessing.Pool(3, maxtasksperchild=10)
1142 self.assertEqual(3, len(p._pool))
1143 origworkerpids = [w.pid for w in p._pool]
1144 # Run many tasks so each worker gets replaced (hopefully)
1145 results = []
1146 for i in range(100):
1147 results.append(p.apply_async(sqr, (i, )))
1148 # Fetch the results and verify we got the right answers,
1149 # also ensuring all the tasks have completed.
1150 for (j, res) in enumerate(results):
1151 self.assertEqual(res.get(), sqr(j))
1152 # Refill the pool
1153 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001154 # Wait until all workers are alive
1155 countdown = 5
1156 while countdown and not all(w.is_alive() for w in p._pool):
1157 countdown -= 1
1158 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001159 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001160 # All pids should be assigned. See issue #7805.
1161 self.assertNotIn(None, origworkerpids)
1162 self.assertNotIn(None, finalworkerpids)
1163 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001164 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1165 p.close()
1166 p.join()
1167
Benjamin Petersone711caf2008-06-11 16:44:04 +00001168#
1169# Test that manager has expected number of shared objects left
1170#
1171
1172class _TestZZZNumberOfObjects(BaseTestCase):
1173 # Because test cases are sorted alphabetically, this one will get
1174 # run after all the other tests for the manager. It tests that
1175 # there have been no "reference leaks" for the manager's shared
1176 # objects. Note the comment in _TestPool.test_terminate().
1177 ALLOWED_TYPES = ('manager',)
1178
1179 def test_number_of_objects(self):
1180 EXPECTED_NUMBER = 1 # the pool object is still alive
1181 multiprocessing.active_children() # discard dead process objs
1182 gc.collect() # do garbage collection
1183 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001184 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001185 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001186 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001187 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001188
1189 self.assertEqual(refs, EXPECTED_NUMBER)
1190
1191#
1192# Test of creating a customized manager class
1193#
1194
1195from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1196
1197class FooBar(object):
1198 def f(self):
1199 return 'f()'
1200 def g(self):
1201 raise ValueError
1202 def _h(self):
1203 return '_h()'
1204
1205def baz():
1206 for i in range(10):
1207 yield i*i
1208
1209class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001210 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001211 def __iter__(self):
1212 return self
1213 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001214 return self._callmethod('__next__')
1215
1216class MyManager(BaseManager):
1217 pass
1218
1219MyManager.register('Foo', callable=FooBar)
1220MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1221MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1222
1223
1224class _TestMyManager(BaseTestCase):
1225
1226 ALLOWED_TYPES = ('manager',)
1227
1228 def test_mymanager(self):
1229 manager = MyManager()
1230 manager.start()
1231
1232 foo = manager.Foo()
1233 bar = manager.Bar()
1234 baz = manager.baz()
1235
1236 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1237 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1238
1239 self.assertEqual(foo_methods, ['f', 'g'])
1240 self.assertEqual(bar_methods, ['f', '_h'])
1241
1242 self.assertEqual(foo.f(), 'f()')
1243 self.assertRaises(ValueError, foo.g)
1244 self.assertEqual(foo._callmethod('f'), 'f()')
1245 self.assertRaises(RemoteError, foo._callmethod, '_h')
1246
1247 self.assertEqual(bar.f(), 'f()')
1248 self.assertEqual(bar._h(), '_h()')
1249 self.assertEqual(bar._callmethod('f'), 'f()')
1250 self.assertEqual(bar._callmethod('_h'), '_h()')
1251
1252 self.assertEqual(list(baz), [i*i for i in range(10)])
1253
1254 manager.shutdown()
1255
1256#
1257# Test of connecting to a remote server and using xmlrpclib for serialization
1258#
1259
1260_queue = pyqueue.Queue()
1261def get_queue():
1262 return _queue
1263
1264class QueueManager(BaseManager):
1265 '''manager class used by server process'''
1266QueueManager.register('get_queue', callable=get_queue)
1267
1268class QueueManager2(BaseManager):
1269 '''manager class which specifies the same interface as QueueManager'''
1270QueueManager2.register('get_queue')
1271
1272
1273SERIALIZER = 'xmlrpclib'
1274
1275class _TestRemoteManager(BaseTestCase):
1276
1277 ALLOWED_TYPES = ('manager',)
1278
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001279 @classmethod
1280 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001281 manager = QueueManager2(
1282 address=address, authkey=authkey, serializer=SERIALIZER
1283 )
1284 manager.connect()
1285 queue = manager.get_queue()
1286 queue.put(('hello world', None, True, 2.25))
1287
1288 def test_remote(self):
1289 authkey = os.urandom(32)
1290
1291 manager = QueueManager(
1292 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1293 )
1294 manager.start()
1295
1296 p = self.Process(target=self._putter, args=(manager.address, authkey))
1297 p.start()
1298
1299 manager2 = QueueManager2(
1300 address=manager.address, authkey=authkey, serializer=SERIALIZER
1301 )
1302 manager2.connect()
1303 queue = manager2.get_queue()
1304
1305 # Note that xmlrpclib will deserialize object as a list not a tuple
1306 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1307
1308 # Because we are using xmlrpclib for serialization instead of
1309 # pickle this will cause a serialization error.
1310 self.assertRaises(Exception, queue.put, time.sleep)
1311
1312 # Make queue finalizer run before the server is stopped
1313 del queue
1314 manager.shutdown()
1315
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001316class _TestManagerRestart(BaseTestCase):
1317
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001318 @classmethod
1319 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001320 manager = QueueManager(
1321 address=address, authkey=authkey, serializer=SERIALIZER)
1322 manager.connect()
1323 queue = manager.get_queue()
1324 queue.put('hello world')
1325
1326 def test_rapid_restart(self):
1327 authkey = os.urandom(32)
1328 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001329 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001330 srvr = manager.get_server()
1331 addr = srvr.address
1332 # Close the connection.Listener socket which gets opened as a part
1333 # of manager.get_server(). It's not needed for the test.
1334 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001335 manager.start()
1336
1337 p = self.Process(target=self._putter, args=(manager.address, authkey))
1338 p.start()
1339 queue = manager.get_queue()
1340 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001341 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001342 manager.shutdown()
1343 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001344 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001345 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001346 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001347
Benjamin Petersone711caf2008-06-11 16:44:04 +00001348#
1349#
1350#
1351
1352SENTINEL = latin('')
1353
1354class _TestConnection(BaseTestCase):
1355
1356 ALLOWED_TYPES = ('processes', 'threads')
1357
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001358 @classmethod
1359 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001360 for msg in iter(conn.recv_bytes, SENTINEL):
1361 conn.send_bytes(msg)
1362 conn.close()
1363
1364 def test_connection(self):
1365 conn, child_conn = self.Pipe()
1366
1367 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001368 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001369 p.start()
1370
1371 seq = [1, 2.25, None]
1372 msg = latin('hello world')
1373 longmsg = msg * 10
1374 arr = array.array('i', list(range(4)))
1375
1376 if self.TYPE == 'processes':
1377 self.assertEqual(type(conn.fileno()), int)
1378
1379 self.assertEqual(conn.send(seq), None)
1380 self.assertEqual(conn.recv(), seq)
1381
1382 self.assertEqual(conn.send_bytes(msg), None)
1383 self.assertEqual(conn.recv_bytes(), msg)
1384
1385 if self.TYPE == 'processes':
1386 buffer = array.array('i', [0]*10)
1387 expected = list(arr) + [0] * (10 - len(arr))
1388 self.assertEqual(conn.send_bytes(arr), None)
1389 self.assertEqual(conn.recv_bytes_into(buffer),
1390 len(arr) * buffer.itemsize)
1391 self.assertEqual(list(buffer), expected)
1392
1393 buffer = array.array('i', [0]*10)
1394 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1395 self.assertEqual(conn.send_bytes(arr), None)
1396 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1397 len(arr) * buffer.itemsize)
1398 self.assertEqual(list(buffer), expected)
1399
1400 buffer = bytearray(latin(' ' * 40))
1401 self.assertEqual(conn.send_bytes(longmsg), None)
1402 try:
1403 res = conn.recv_bytes_into(buffer)
1404 except multiprocessing.BufferTooShort as e:
1405 self.assertEqual(e.args, (longmsg,))
1406 else:
1407 self.fail('expected BufferTooShort, got %s' % res)
1408
1409 poll = TimingWrapper(conn.poll)
1410
1411 self.assertEqual(poll(), False)
1412 self.assertTimingAlmostEqual(poll.elapsed, 0)
1413
1414 self.assertEqual(poll(TIMEOUT1), False)
1415 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1416
1417 conn.send(None)
1418
1419 self.assertEqual(poll(TIMEOUT1), True)
1420 self.assertTimingAlmostEqual(poll.elapsed, 0)
1421
1422 self.assertEqual(conn.recv(), None)
1423
1424 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1425 conn.send_bytes(really_big_msg)
1426 self.assertEqual(conn.recv_bytes(), really_big_msg)
1427
1428 conn.send_bytes(SENTINEL) # tell child to quit
1429 child_conn.close()
1430
1431 if self.TYPE == 'processes':
1432 self.assertEqual(conn.readable, True)
1433 self.assertEqual(conn.writable, True)
1434 self.assertRaises(EOFError, conn.recv)
1435 self.assertRaises(EOFError, conn.recv_bytes)
1436
1437 p.join()
1438
1439 def test_duplex_false(self):
1440 reader, writer = self.Pipe(duplex=False)
1441 self.assertEqual(writer.send(1), None)
1442 self.assertEqual(reader.recv(), 1)
1443 if self.TYPE == 'processes':
1444 self.assertEqual(reader.readable, True)
1445 self.assertEqual(reader.writable, False)
1446 self.assertEqual(writer.readable, False)
1447 self.assertEqual(writer.writable, True)
1448 self.assertRaises(IOError, reader.send, 2)
1449 self.assertRaises(IOError, writer.recv)
1450 self.assertRaises(IOError, writer.poll)
1451
1452 def test_spawn_close(self):
1453 # We test that a pipe connection can be closed by parent
1454 # process immediately after child is spawned. On Windows this
1455 # would have sometimes failed on old versions because
1456 # child_conn would be closed before the child got a chance to
1457 # duplicate it.
1458 conn, child_conn = self.Pipe()
1459
1460 p = self.Process(target=self._echo, args=(child_conn,))
1461 p.start()
1462 child_conn.close() # this might complete before child initializes
1463
1464 msg = latin('hello')
1465 conn.send_bytes(msg)
1466 self.assertEqual(conn.recv_bytes(), msg)
1467
1468 conn.send_bytes(SENTINEL)
1469 conn.close()
1470 p.join()
1471
1472 def test_sendbytes(self):
1473 if self.TYPE != 'processes':
1474 return
1475
1476 msg = latin('abcdefghijklmnopqrstuvwxyz')
1477 a, b = self.Pipe()
1478
1479 a.send_bytes(msg)
1480 self.assertEqual(b.recv_bytes(), msg)
1481
1482 a.send_bytes(msg, 5)
1483 self.assertEqual(b.recv_bytes(), msg[5:])
1484
1485 a.send_bytes(msg, 7, 8)
1486 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1487
1488 a.send_bytes(msg, 26)
1489 self.assertEqual(b.recv_bytes(), latin(''))
1490
1491 a.send_bytes(msg, 26, 0)
1492 self.assertEqual(b.recv_bytes(), latin(''))
1493
1494 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1495
1496 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1497
1498 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1499
1500 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1501
1502 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1503
Benjamin Petersone711caf2008-06-11 16:44:04 +00001504class _TestListenerClient(BaseTestCase):
1505
1506 ALLOWED_TYPES = ('processes', 'threads')
1507
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001508 @classmethod
1509 def _test(cls, address):
1510 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001511 conn.send('hello')
1512 conn.close()
1513
1514 def test_listener_client(self):
1515 for family in self.connection.families:
1516 l = self.connection.Listener(family=family)
1517 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001518 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 p.start()
1520 conn = l.accept()
1521 self.assertEqual(conn.recv(), 'hello')
1522 p.join()
1523 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001524#
1525# Test of sending connection and socket objects between processes
1526#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001527"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001528class _TestPicklingConnections(BaseTestCase):
1529
1530 ALLOWED_TYPES = ('processes',)
1531
1532 def _listener(self, conn, families):
1533 for fam in families:
1534 l = self.connection.Listener(family=fam)
1535 conn.send(l.address)
1536 new_conn = l.accept()
1537 conn.send(new_conn)
1538
1539 if self.TYPE == 'processes':
1540 l = socket.socket()
1541 l.bind(('localhost', 0))
1542 conn.send(l.getsockname())
1543 l.listen(1)
1544 new_conn, addr = l.accept()
1545 conn.send(new_conn)
1546
1547 conn.recv()
1548
1549 def _remote(self, conn):
1550 for (address, msg) in iter(conn.recv, None):
1551 client = self.connection.Client(address)
1552 client.send(msg.upper())
1553 client.close()
1554
1555 if self.TYPE == 'processes':
1556 address, msg = conn.recv()
1557 client = socket.socket()
1558 client.connect(address)
1559 client.sendall(msg.upper())
1560 client.close()
1561
1562 conn.close()
1563
1564 def test_pickling(self):
1565 try:
1566 multiprocessing.allow_connection_pickling()
1567 except ImportError:
1568 return
1569
1570 families = self.connection.families
1571
1572 lconn, lconn0 = self.Pipe()
1573 lp = self.Process(target=self._listener, args=(lconn0, families))
1574 lp.start()
1575 lconn0.close()
1576
1577 rconn, rconn0 = self.Pipe()
1578 rp = self.Process(target=self._remote, args=(rconn0,))
1579 rp.start()
1580 rconn0.close()
1581
1582 for fam in families:
1583 msg = ('This connection uses family %s' % fam).encode('ascii')
1584 address = lconn.recv()
1585 rconn.send((address, msg))
1586 new_conn = lconn.recv()
1587 self.assertEqual(new_conn.recv(), msg.upper())
1588
1589 rconn.send(None)
1590
1591 if self.TYPE == 'processes':
1592 msg = latin('This connection uses a normal socket')
1593 address = lconn.recv()
1594 rconn.send((address, msg))
1595 if hasattr(socket, 'fromfd'):
1596 new_conn = lconn.recv()
1597 self.assertEqual(new_conn.recv(100), msg.upper())
1598 else:
1599 # XXX On Windows with Py2.6 need to backport fromfd()
1600 discard = lconn.recv_bytes()
1601
1602 lconn.send(None)
1603
1604 rconn.close()
1605 lconn.close()
1606
1607 lp.join()
1608 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001609"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001610#
1611#
1612#
1613
1614class _TestHeap(BaseTestCase):
1615
1616 ALLOWED_TYPES = ('processes',)
1617
1618 def test_heap(self):
1619 iterations = 5000
1620 maxblocks = 50
1621 blocks = []
1622
1623 # create and destroy lots of blocks of different sizes
1624 for i in range(iterations):
1625 size = int(random.lognormvariate(0, 1) * 1000)
1626 b = multiprocessing.heap.BufferWrapper(size)
1627 blocks.append(b)
1628 if len(blocks) > maxblocks:
1629 i = random.randrange(maxblocks)
1630 del blocks[i]
1631
1632 # get the heap object
1633 heap = multiprocessing.heap.BufferWrapper._heap
1634
1635 # verify the state of the heap
1636 all = []
1637 occupied = 0
1638 for L in list(heap._len_to_seq.values()):
1639 for arena, start, stop in L:
1640 all.append((heap._arenas.index(arena), start, stop,
1641 stop-start, 'free'))
1642 for arena, start, stop in heap._allocated_blocks:
1643 all.append((heap._arenas.index(arena), start, stop,
1644 stop-start, 'occupied'))
1645 occupied += (stop-start)
1646
1647 all.sort()
1648
1649 for i in range(len(all)-1):
1650 (arena, start, stop) = all[i][:3]
1651 (narena, nstart, nstop) = all[i+1][:3]
1652 self.assertTrue((arena != narena and nstart == 0) or
1653 (stop == nstart))
1654
1655#
1656#
1657#
1658
Benjamin Petersone711caf2008-06-11 16:44:04 +00001659class _Foo(Structure):
1660 _fields_ = [
1661 ('x', c_int),
1662 ('y', c_double)
1663 ]
1664
Brian Curtinafa88b52010-10-07 01:12:19 +00001665@unittest.skipUnless(HAS_SHAREDCTYPES,
1666 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001667class _TestSharedCTypes(BaseTestCase):
1668
1669 ALLOWED_TYPES = ('processes',)
1670
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001671 @classmethod
1672 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001673 x.value *= 2
1674 y.value *= 2
1675 foo.x *= 2
1676 foo.y *= 2
1677 string.value *= 2
1678 for i in range(len(arr)):
1679 arr[i] *= 2
1680
1681 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001682 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001683 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001684 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001685 arr = self.Array('d', list(range(10)), lock=lock)
1686 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001687 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001688
1689 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1690 p.start()
1691 p.join()
1692
1693 self.assertEqual(x.value, 14)
1694 self.assertAlmostEqual(y.value, 2.0/3.0)
1695 self.assertEqual(foo.x, 6)
1696 self.assertAlmostEqual(foo.y, 4.0)
1697 for i in range(10):
1698 self.assertAlmostEqual(arr[i], i*2)
1699 self.assertEqual(string.value, latin('hellohello'))
1700
1701 def test_synchronize(self):
1702 self.test_sharedctypes(lock=True)
1703
1704 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001705 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001706 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001707 foo.x = 0
1708 foo.y = 0
1709 self.assertEqual(bar.x, 2)
1710 self.assertAlmostEqual(bar.y, 5.0)
1711
1712#
1713#
1714#
1715
1716class _TestFinalize(BaseTestCase):
1717
1718 ALLOWED_TYPES = ('processes',)
1719
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001720 @classmethod
1721 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001722 class Foo(object):
1723 pass
1724
1725 a = Foo()
1726 util.Finalize(a, conn.send, args=('a',))
1727 del a # triggers callback for a
1728
1729 b = Foo()
1730 close_b = util.Finalize(b, conn.send, args=('b',))
1731 close_b() # triggers callback for b
1732 close_b() # does nothing because callback has already been called
1733 del b # does nothing because callback has already been called
1734
1735 c = Foo()
1736 util.Finalize(c, conn.send, args=('c',))
1737
1738 d10 = Foo()
1739 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1740
1741 d01 = Foo()
1742 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1743 d02 = Foo()
1744 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1745 d03 = Foo()
1746 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1747
1748 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1749
1750 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1751
1752 # call mutliprocessing's cleanup function then exit process without
1753 # garbage collecting locals
1754 util._exit_function()
1755 conn.close()
1756 os._exit(0)
1757
1758 def test_finalize(self):
1759 conn, child_conn = self.Pipe()
1760
1761 p = self.Process(target=self._test_finalize, args=(child_conn,))
1762 p.start()
1763 p.join()
1764
1765 result = [obj for obj in iter(conn.recv, 'STOP')]
1766 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1767
1768#
1769# Test that from ... import * works for each module
1770#
1771
1772class _TestImportStar(BaseTestCase):
1773
1774 ALLOWED_TYPES = ('processes',)
1775
1776 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001777 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001778 'multiprocessing', 'multiprocessing.connection',
1779 'multiprocessing.heap', 'multiprocessing.managers',
1780 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001781 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001782 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001783 ]
1784
1785 if c_int is not None:
1786 # This module requires _ctypes
1787 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001788
1789 for name in modules:
1790 __import__(name)
1791 mod = sys.modules[name]
1792
1793 for attr in getattr(mod, '__all__', ()):
1794 self.assertTrue(
1795 hasattr(mod, attr),
1796 '%r does not have attribute %r' % (mod, attr)
1797 )
1798
1799#
1800# Quick test that logging works -- does not test logging output
1801#
1802
1803class _TestLogging(BaseTestCase):
1804
1805 ALLOWED_TYPES = ('processes',)
1806
1807 def test_enable_logging(self):
1808 logger = multiprocessing.get_logger()
1809 logger.setLevel(util.SUBWARNING)
1810 self.assertTrue(logger is not None)
1811 logger.debug('this will not be printed')
1812 logger.info('nor will this')
1813 logger.setLevel(LOG_LEVEL)
1814
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001815 @classmethod
1816 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001817 logger = multiprocessing.get_logger()
1818 conn.send(logger.getEffectiveLevel())
1819
1820 def test_level(self):
1821 LEVEL1 = 32
1822 LEVEL2 = 37
1823
1824 logger = multiprocessing.get_logger()
1825 root_logger = logging.getLogger()
1826 root_level = root_logger.level
1827
1828 reader, writer = multiprocessing.Pipe(duplex=False)
1829
1830 logger.setLevel(LEVEL1)
1831 self.Process(target=self._test_level, args=(writer,)).start()
1832 self.assertEqual(LEVEL1, reader.recv())
1833
1834 logger.setLevel(logging.NOTSET)
1835 root_logger.setLevel(LEVEL2)
1836 self.Process(target=self._test_level, args=(writer,)).start()
1837 self.assertEqual(LEVEL2, reader.recv())
1838
1839 root_logger.setLevel(root_level)
1840 logger.setLevel(level=LOG_LEVEL)
1841
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001842
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001843# class _TestLoggingProcessName(BaseTestCase):
1844#
1845# def handle(self, record):
1846# assert record.processName == multiprocessing.current_process().name
1847# self.__handled = True
1848#
1849# def test_logging(self):
1850# handler = logging.Handler()
1851# handler.handle = self.handle
1852# self.__handled = False
1853# # Bypass getLogger() and side-effects
1854# logger = logging.getLoggerClass()(
1855# 'multiprocessing.test.TestLoggingProcessName')
1856# logger.addHandler(handler)
1857# logger.propagate = False
1858#
1859# logger.warn('foo')
1860# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001861
Benjamin Petersone711caf2008-06-11 16:44:04 +00001862#
Jesse Noller6214edd2009-01-19 16:23:53 +00001863# Test to verify handle verification, see issue 3321
1864#
1865
1866class TestInvalidHandle(unittest.TestCase):
1867
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001868 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001869 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001870 conn = _multiprocessing.Connection(44977608)
1871 self.assertRaises(IOError, conn.poll)
1872 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001873
Jesse Noller6214edd2009-01-19 16:23:53 +00001874#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001875# Functions used to create test cases from the base ones in this module
1876#
1877
1878def get_attributes(Source, names):
1879 d = {}
1880 for name in names:
1881 obj = getattr(Source, name)
1882 if type(obj) == type(get_attributes):
1883 obj = staticmethod(obj)
1884 d[name] = obj
1885 return d
1886
1887def create_test_cases(Mixin, type):
1888 result = {}
1889 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001890 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001891
1892 for name in list(glob.keys()):
1893 if name.startswith('_Test'):
1894 base = glob[name]
1895 if type in base.ALLOWED_TYPES:
1896 newname = 'With' + Type + name[1:]
1897 class Temp(base, unittest.TestCase, Mixin):
1898 pass
1899 result[newname] = Temp
1900 Temp.__name__ = newname
1901 Temp.__module__ = Mixin.__module__
1902 return result
1903
1904#
1905# Create test cases
1906#
1907
1908class ProcessesMixin(object):
1909 TYPE = 'processes'
1910 Process = multiprocessing.Process
1911 locals().update(get_attributes(multiprocessing, (
1912 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1913 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1914 'RawArray', 'current_process', 'active_children', 'Pipe',
1915 'connection', 'JoinableQueue'
1916 )))
1917
1918testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1919globals().update(testcases_processes)
1920
1921
1922class ManagerMixin(object):
1923 TYPE = 'manager'
1924 Process = multiprocessing.Process
1925 manager = object.__new__(multiprocessing.managers.SyncManager)
1926 locals().update(get_attributes(manager, (
1927 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1928 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1929 'Namespace', 'JoinableQueue'
1930 )))
1931
1932testcases_manager = create_test_cases(ManagerMixin, type='manager')
1933globals().update(testcases_manager)
1934
1935
1936class ThreadsMixin(object):
1937 TYPE = 'threads'
1938 Process = multiprocessing.dummy.Process
1939 locals().update(get_attributes(multiprocessing.dummy, (
1940 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1941 'Condition', 'Event', 'Value', 'Array', 'current_process',
1942 'active_children', 'Pipe', 'connection', 'dict', 'list',
1943 'Namespace', 'JoinableQueue'
1944 )))
1945
1946testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1947globals().update(testcases_threads)
1948
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001949class OtherTest(unittest.TestCase):
1950 # TODO: add more tests for deliver/answer challenge.
1951 def test_deliver_challenge_auth_failure(self):
1952 class _FakeConnection(object):
1953 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001954 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001955 def send_bytes(self, data):
1956 pass
1957 self.assertRaises(multiprocessing.AuthenticationError,
1958 multiprocessing.connection.deliver_challenge,
1959 _FakeConnection(), b'abc')
1960
1961 def test_answer_challenge_auth_failure(self):
1962 class _FakeConnection(object):
1963 def __init__(self):
1964 self.count = 0
1965 def recv_bytes(self, size):
1966 self.count += 1
1967 if self.count == 1:
1968 return multiprocessing.connection.CHALLENGE
1969 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001970 return b'something bogus'
1971 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001972 def send_bytes(self, data):
1973 pass
1974 self.assertRaises(multiprocessing.AuthenticationError,
1975 multiprocessing.connection.answer_challenge,
1976 _FakeConnection(), b'abc')
1977
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001978#
1979# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1980#
1981
1982def initializer(ns):
1983 ns.test += 1
1984
1985class TestInitializers(unittest.TestCase):
1986 def setUp(self):
1987 self.mgr = multiprocessing.Manager()
1988 self.ns = self.mgr.Namespace()
1989 self.ns.test = 0
1990
1991 def tearDown(self):
1992 self.mgr.shutdown()
1993
1994 def test_manager_initializer(self):
1995 m = multiprocessing.managers.SyncManager()
1996 self.assertRaises(TypeError, m.start, 1)
1997 m.start(initializer, (self.ns,))
1998 self.assertEqual(self.ns.test, 1)
1999 m.shutdown()
2000
2001 def test_pool_initializer(self):
2002 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2003 p = multiprocessing.Pool(1, initializer, (self.ns,))
2004 p.close()
2005 p.join()
2006 self.assertEqual(self.ns.test, 1)
2007
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002008#
2009# Issue 5155, 5313, 5331: Test process in processes
2010# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2011#
2012
2013def _ThisSubProcess(q):
2014 try:
2015 item = q.get(block=False)
2016 except pyqueue.Empty:
2017 pass
2018
2019def _TestProcess(q):
2020 queue = multiprocessing.Queue()
2021 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2022 subProc.start()
2023 subProc.join()
2024
2025def _afunc(x):
2026 return x*x
2027
2028def pool_in_process():
2029 pool = multiprocessing.Pool(processes=4)
2030 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2031
2032class _file_like(object):
2033 def __init__(self, delegate):
2034 self._delegate = delegate
2035 self._pid = None
2036
2037 @property
2038 def cache(self):
2039 pid = os.getpid()
2040 # There are no race conditions since fork keeps only the running thread
2041 if pid != self._pid:
2042 self._pid = pid
2043 self._cache = []
2044 return self._cache
2045
2046 def write(self, data):
2047 self.cache.append(data)
2048
2049 def flush(self):
2050 self._delegate.write(''.join(self.cache))
2051 self._cache = []
2052
2053class TestStdinBadfiledescriptor(unittest.TestCase):
2054
2055 def test_queue_in_process(self):
2056 queue = multiprocessing.Queue()
2057 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2058 proc.start()
2059 proc.join()
2060
2061 def test_pool_in_process(self):
2062 p = multiprocessing.Process(target=pool_in_process)
2063 p.start()
2064 p.join()
2065
2066 def test_flushing(self):
2067 sio = io.StringIO()
2068 flike = _file_like(sio)
2069 flike.write('foo')
2070 proc = multiprocessing.Process(target=lambda: flike.flush())
2071 flike.flush()
2072 assert sio.getvalue() == 'foo'
2073
2074testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2075 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002076
Benjamin Petersone711caf2008-06-11 16:44:04 +00002077#
2078#
2079#
2080
2081def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002082 if sys.platform.startswith("linux"):
2083 try:
2084 lock = multiprocessing.RLock()
2085 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002086 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002087
Benjamin Petersone711caf2008-06-11 16:44:04 +00002088 if run is None:
2089 from test.support import run_unittest as run
2090
2091 util.get_temp_dir() # creates temp directory for use by all processes
2092
2093 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2094
Benjamin Peterson41181742008-07-02 20:22:54 +00002095 ProcessesMixin.pool = multiprocessing.Pool(4)
2096 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2097 ManagerMixin.manager.__init__()
2098 ManagerMixin.manager.start()
2099 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002100
2101 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002102 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2103 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002104 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2105 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002106 )
2107
2108 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2109 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2110 run(suite)
2111
Benjamin Peterson41181742008-07-02 20:22:54 +00002112 ThreadsMixin.pool.terminate()
2113 ProcessesMixin.pool.terminate()
2114 ManagerMixin.pool.terminate()
2115 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002116
Benjamin Peterson41181742008-07-02 20:22:54 +00002117 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002118
2119def main():
2120 test_main(unittest.TextTestRunner(verbosity=2).run)
2121
2122if __name__ == '__main__':
2123 main()