blob: 465a83102e31f5aaed8997678b967d115756e856 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000019import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
R. David Murraya21e4ca2009-03-31 23:16:50 +000022# Skip tests if _multiprocessing wasn't built.
23_multiprocessing = test.support.import_module('_multiprocessing')
24# Skip tests if sem_open implementation is broken.
25test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000026# import threading after _multiprocessing to raise a more revelant error
27# message: "No module named _multiprocessing". _multiprocessing is not compiled
28# without thread support.
29import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
Benjamin Petersone711caf2008-06-11 16:44:04 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37from multiprocessing import util
38
Brian Curtinafa88b52010-10-07 01:12:19 +000039try:
40 from multiprocessing.sharedctypes import Value, copy
41 HAS_SHAREDCTYPES = True
42except ImportError:
43 HAS_SHAREDCTYPES = False
44
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46#
47#
48
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000049def latin(s):
50 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Benjamin Petersone711caf2008-06-11 16:44:04 +000052#
53# Constants
54#
55
56LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000057#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59DELTA = 0.1
60CHECK_TIMINGS = False # making true makes tests take a lot longer
61 # and can sometimes cause some non-serious
62 # failures because some calls block a bit
63 # longer than expected
64if CHECK_TIMINGS:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
66else:
67 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
68
69HAVE_GETVALUE = not getattr(_multiprocessing,
70 'HAVE_BROKEN_SEM_GETVALUE', False)
71
Jesse Noller6214edd2009-01-19 16:23:53 +000072WIN32 = (sys.platform == "win32")
73
Benjamin Petersone711caf2008-06-11 16:44:04 +000074#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000075# Some tests require ctypes
76#
77
78try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000079 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000080except ImportError:
81 Structure = object
82 c_int = c_double = None
83
84#
Benjamin Petersone711caf2008-06-11 16:44:04 +000085# Creates a wrapper for a function which records the time it takes to finish
86#
87
88class TimingWrapper(object):
89
90 def __init__(self, func):
91 self.func = func
92 self.elapsed = None
93
94 def __call__(self, *args, **kwds):
95 t = time.time()
96 try:
97 return self.func(*args, **kwds)
98 finally:
99 self.elapsed = time.time() - t
100
101#
102# Base class for test cases
103#
104
105class BaseTestCase(object):
106
107 ALLOWED_TYPES = ('processes', 'manager', 'threads')
108
109 def assertTimingAlmostEqual(self, a, b):
110 if CHECK_TIMINGS:
111 self.assertAlmostEqual(a, b, 1)
112
113 def assertReturnsIfImplemented(self, value, func, *args):
114 try:
115 res = func(*args)
116 except NotImplementedError:
117 pass
118 else:
119 return self.assertEqual(value, res)
120
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000121 # For the sanity of Windows users, rather than crashing or freezing in
122 # multiple ways.
123 def __reduce__(self, *args):
124 raise NotImplementedError("shouldn't try to pickle a test case")
125
126 __reduce_ex__ = __reduce__
127
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128#
129# Return the value of a semaphore
130#
131
132def get_value(self):
133 try:
134 return self.get_value()
135 except AttributeError:
136 try:
137 return self._Semaphore__value
138 except AttributeError:
139 try:
140 return self._value
141 except AttributeError:
142 raise NotImplementedError
143
144#
145# Testcases
146#
147
148class _TestProcess(BaseTestCase):
149
150 ALLOWED_TYPES = ('processes', 'threads')
151
152 def test_current(self):
153 if self.TYPE == 'threads':
154 return
155
156 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158
159 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000161 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 self.assertEqual(current.ident, os.getpid())
164 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000166 @classmethod
167 def _test(cls, q, *args, **kwds):
168 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 q.put(args)
170 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000173 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 q.put(current.pid)
175
176 def test_process(self):
177 q = self.Queue(1)
178 e = self.Event()
179 args = (q, 1, 2)
180 kwargs = {'hello':23, 'bye':2.54}
181 name = 'SomeProcess'
182 p = self.Process(
183 target=self._test, args=args, kwargs=kwargs, name=name
184 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 current = self.current_process()
187
188 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000189 self.assertEqual(p.authkey, current.authkey)
190 self.assertEqual(p.is_alive(), False)
191 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195
196 p.start()
197
Ezio Melottib3aedd42010-11-20 19:04:17 +0000198 self.assertEqual(p.exitcode, None)
199 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000200 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201
Ezio Melottib3aedd42010-11-20 19:04:17 +0000202 self.assertEqual(q.get(), args[1:])
203 self.assertEqual(q.get(), kwargs)
204 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000206 self.assertEqual(q.get(), current.authkey)
207 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
209 p.join()
210
Ezio Melottib3aedd42010-11-20 19:04:17 +0000211 self.assertEqual(p.exitcode, 0)
212 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000213 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000215 @classmethod
216 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217 time.sleep(1000)
218
219 def test_terminate(self):
220 if self.TYPE == 'threads':
221 return
222
223 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 p.start()
226
227 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000228 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230
231 p.terminate()
232
233 join = TimingWrapper(p.join)
234 self.assertEqual(join(), None)
235 self.assertTimingAlmostEqual(join.elapsed, 0.0)
236
237 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000238 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 p.join()
241
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 # XXX sometimes get p.exitcode == 0 on Windows ...
243 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 def test_cpu_count(self):
246 try:
247 cpus = multiprocessing.cpu_count()
248 except NotImplementedError:
249 cpus = 1
250 self.assertTrue(type(cpus) is int)
251 self.assertTrue(cpus >= 1)
252
253 def test_active_children(self):
254 self.assertEqual(type(self.active_children()), list)
255
256 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000257 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000265 @classmethod
266 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267 from multiprocessing import forking
268 wconn.send(id)
269 if len(id) < 2:
270 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 p = cls.Process(
272 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 )
274 p.start()
275 p.join()
276
277 def test_recursion(self):
278 rconn, wconn = self.Pipe(duplex=False)
279 self._test_recursion(wconn, [])
280
281 time.sleep(DELTA)
282 result = []
283 while rconn.poll():
284 result.append(rconn.recv())
285
286 expected = [
287 [],
288 [0],
289 [0, 0],
290 [0, 1],
291 [1],
292 [1, 0],
293 [1, 1]
294 ]
295 self.assertEqual(result, expected)
296
297#
298#
299#
300
301class _UpperCaser(multiprocessing.Process):
302
303 def __init__(self):
304 multiprocessing.Process.__init__(self)
305 self.child_conn, self.parent_conn = multiprocessing.Pipe()
306
307 def run(self):
308 self.parent_conn.close()
309 for s in iter(self.child_conn.recv, None):
310 self.child_conn.send(s.upper())
311 self.child_conn.close()
312
313 def submit(self, s):
314 assert type(s) is str
315 self.parent_conn.send(s)
316 return self.parent_conn.recv()
317
318 def stop(self):
319 self.parent_conn.send(None)
320 self.parent_conn.close()
321 self.child_conn.close()
322
323class _TestSubclassingProcess(BaseTestCase):
324
325 ALLOWED_TYPES = ('processes',)
326
327 def test_subclassing(self):
328 uppercaser = _UpperCaser()
329 uppercaser.start()
330 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
331 self.assertEqual(uppercaser.submit('world'), 'WORLD')
332 uppercaser.stop()
333 uppercaser.join()
334
335#
336#
337#
338
339def queue_empty(q):
340 if hasattr(q, 'empty'):
341 return q.empty()
342 else:
343 return q.qsize() == 0
344
345def queue_full(q, maxsize):
346 if hasattr(q, 'full'):
347 return q.full()
348 else:
349 return q.qsize() == maxsize
350
351
352class _TestQueue(BaseTestCase):
353
354
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000355 @classmethod
356 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000357 child_can_start.wait()
358 for i in range(6):
359 queue.get()
360 parent_can_continue.set()
361
362 def test_put(self):
363 MAXSIZE = 6
364 queue = self.Queue(maxsize=MAXSIZE)
365 child_can_start = self.Event()
366 parent_can_continue = self.Event()
367
368 proc = self.Process(
369 target=self._test_put,
370 args=(queue, child_can_start, parent_can_continue)
371 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000372 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 proc.start()
374
375 self.assertEqual(queue_empty(queue), True)
376 self.assertEqual(queue_full(queue, MAXSIZE), False)
377
378 queue.put(1)
379 queue.put(2, True)
380 queue.put(3, True, None)
381 queue.put(4, False)
382 queue.put(5, False, None)
383 queue.put_nowait(6)
384
385 # the values may be in buffer but not yet in pipe so sleep a bit
386 time.sleep(DELTA)
387
388 self.assertEqual(queue_empty(queue), False)
389 self.assertEqual(queue_full(queue, MAXSIZE), True)
390
391 put = TimingWrapper(queue.put)
392 put_nowait = TimingWrapper(queue.put_nowait)
393
394 self.assertRaises(pyqueue.Full, put, 7, False)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(pyqueue.Full, put, 7, False, None)
398 self.assertTimingAlmostEqual(put.elapsed, 0)
399
400 self.assertRaises(pyqueue.Full, put_nowait, 7)
401 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
402
403 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
404 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
405
406 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
407 self.assertTimingAlmostEqual(put.elapsed, 0)
408
409 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
410 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 self.assertEqual(queue_empty(queue), True)
416 self.assertEqual(queue_full(queue, MAXSIZE), False)
417
418 proc.join()
419
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000420 @classmethod
421 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000423 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000424 queue.put(2)
425 queue.put(3)
426 queue.put(4)
427 queue.put(5)
428 parent_can_continue.set()
429
430 def test_get(self):
431 queue = self.Queue()
432 child_can_start = self.Event()
433 parent_can_continue = self.Event()
434
435 proc = self.Process(
436 target=self._test_get,
437 args=(queue, child_can_start, parent_can_continue)
438 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000439 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440 proc.start()
441
442 self.assertEqual(queue_empty(queue), True)
443
444 child_can_start.set()
445 parent_can_continue.wait()
446
447 time.sleep(DELTA)
448 self.assertEqual(queue_empty(queue), False)
449
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000450 # Hangs unexpectedly, remove for now
451 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000452 self.assertEqual(queue.get(True, None), 2)
453 self.assertEqual(queue.get(True), 3)
454 self.assertEqual(queue.get(timeout=1), 4)
455 self.assertEqual(queue.get_nowait(), 5)
456
457 self.assertEqual(queue_empty(queue), True)
458
459 get = TimingWrapper(queue.get)
460 get_nowait = TimingWrapper(queue.get_nowait)
461
462 self.assertRaises(pyqueue.Empty, get, False)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get, False, None)
466 self.assertTimingAlmostEqual(get.elapsed, 0)
467
468 self.assertRaises(pyqueue.Empty, get_nowait)
469 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
470
471 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
472 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
473
474 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
475 self.assertTimingAlmostEqual(get.elapsed, 0)
476
477 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
478 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
479
480 proc.join()
481
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000482 @classmethod
483 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000484 for i in range(10, 20):
485 queue.put(i)
486 # note that at this point the items may only be buffered, so the
487 # process cannot shutdown until the feeder thread has finished
488 # pushing items onto the pipe.
489
490 def test_fork(self):
491 # Old versions of Queue would fail to create a new feeder
492 # thread for a forked process if the original process had its
493 # own feeder thread. This test checks that this no longer
494 # happens.
495
496 queue = self.Queue()
497
498 # put items on queue so that main process starts a feeder thread
499 for i in range(10):
500 queue.put(i)
501
502 # wait to make sure thread starts before we fork a new process
503 time.sleep(DELTA)
504
505 # fork process
506 p = self.Process(target=self._test_fork, args=(queue,))
507 p.start()
508
509 # check that all expected items are in the queue
510 for i in range(20):
511 self.assertEqual(queue.get(), i)
512 self.assertRaises(pyqueue.Empty, queue.get, False)
513
514 p.join()
515
516 def test_qsize(self):
517 q = self.Queue()
518 try:
519 self.assertEqual(q.qsize(), 0)
520 except NotImplementedError:
521 return
522 q.put(1)
523 self.assertEqual(q.qsize(), 1)
524 q.put(5)
525 self.assertEqual(q.qsize(), 2)
526 q.get()
527 self.assertEqual(q.qsize(), 1)
528 q.get()
529 self.assertEqual(q.qsize(), 0)
530
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000531 @classmethod
532 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000533 for obj in iter(q.get, None):
534 time.sleep(DELTA)
535 q.task_done()
536
537 def test_task_done(self):
538 queue = self.JoinableQueue()
539
540 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000541 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542
543 workers = [self.Process(target=self._test_task_done, args=(queue,))
544 for i in range(4)]
545
546 for p in workers:
547 p.start()
548
549 for i in range(10):
550 queue.put(i)
551
552 queue.join()
553
554 for p in workers:
555 queue.put(None)
556
557 for p in workers:
558 p.join()
559
560#
561#
562#
563
564class _TestLock(BaseTestCase):
565
566 def test_lock(self):
567 lock = self.Lock()
568 self.assertEqual(lock.acquire(), True)
569 self.assertEqual(lock.acquire(False), False)
570 self.assertEqual(lock.release(), None)
571 self.assertRaises((ValueError, threading.ThreadError), lock.release)
572
573 def test_rlock(self):
574 lock = self.RLock()
575 self.assertEqual(lock.acquire(), True)
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.release(), None)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertRaises((AssertionError, RuntimeError), lock.release)
582
Jesse Nollerf8d00852009-03-31 03:25:07 +0000583 def test_lock_context(self):
584 with self.Lock():
585 pass
586
Benjamin Petersone711caf2008-06-11 16:44:04 +0000587
588class _TestSemaphore(BaseTestCase):
589
590 def _test_semaphore(self, sem):
591 self.assertReturnsIfImplemented(2, get_value, sem)
592 self.assertEqual(sem.acquire(), True)
593 self.assertReturnsIfImplemented(1, get_value, sem)
594 self.assertEqual(sem.acquire(), True)
595 self.assertReturnsIfImplemented(0, get_value, sem)
596 self.assertEqual(sem.acquire(False), False)
597 self.assertReturnsIfImplemented(0, get_value, sem)
598 self.assertEqual(sem.release(), None)
599 self.assertReturnsIfImplemented(1, get_value, sem)
600 self.assertEqual(sem.release(), None)
601 self.assertReturnsIfImplemented(2, get_value, sem)
602
603 def test_semaphore(self):
604 sem = self.Semaphore(2)
605 self._test_semaphore(sem)
606 self.assertEqual(sem.release(), None)
607 self.assertReturnsIfImplemented(3, get_value, sem)
608 self.assertEqual(sem.release(), None)
609 self.assertReturnsIfImplemented(4, get_value, sem)
610
611 def test_bounded_semaphore(self):
612 sem = self.BoundedSemaphore(2)
613 self._test_semaphore(sem)
614 # Currently fails on OS/X
615 #if HAVE_GETVALUE:
616 # self.assertRaises(ValueError, sem.release)
617 # self.assertReturnsIfImplemented(2, get_value, sem)
618
619 def test_timeout(self):
620 if self.TYPE != 'processes':
621 return
622
623 sem = self.Semaphore(0)
624 acquire = TimingWrapper(sem.acquire)
625
626 self.assertEqual(acquire(False), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, None), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
631
632 self.assertEqual(acquire(False, TIMEOUT1), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, 0)
634
635 self.assertEqual(acquire(True, TIMEOUT2), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
637
638 self.assertEqual(acquire(timeout=TIMEOUT3), False)
639 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
640
641
642class _TestCondition(BaseTestCase):
643
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000644 @classmethod
645 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000646 cond.acquire()
647 sleeping.release()
648 cond.wait(timeout)
649 woken.release()
650 cond.release()
651
652 def check_invariant(self, cond):
653 # this is only supposed to succeed when there are no sleepers
654 if self.TYPE == 'processes':
655 try:
656 sleepers = (cond._sleeping_count.get_value() -
657 cond._woken_count.get_value())
658 self.assertEqual(sleepers, 0)
659 self.assertEqual(cond._wait_semaphore.get_value(), 0)
660 except NotImplementedError:
661 pass
662
663 def test_notify(self):
664 cond = self.Condition()
665 sleeping = self.Semaphore(0)
666 woken = self.Semaphore(0)
667
668 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000669 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000670 p.start()
671
672 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000673 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 p.start()
675
676 # wait for both children to start sleeping
677 sleeping.acquire()
678 sleeping.acquire()
679
680 # check no process/thread has woken up
681 time.sleep(DELTA)
682 self.assertReturnsIfImplemented(0, get_value, woken)
683
684 # wake up one process/thread
685 cond.acquire()
686 cond.notify()
687 cond.release()
688
689 # check one process/thread has woken up
690 time.sleep(DELTA)
691 self.assertReturnsIfImplemented(1, get_value, woken)
692
693 # wake up another
694 cond.acquire()
695 cond.notify()
696 cond.release()
697
698 # check other has woken up
699 time.sleep(DELTA)
700 self.assertReturnsIfImplemented(2, get_value, woken)
701
702 # check state is not mucked up
703 self.check_invariant(cond)
704 p.join()
705
706 def test_notify_all(self):
707 cond = self.Condition()
708 sleeping = self.Semaphore(0)
709 woken = self.Semaphore(0)
710
711 # start some threads/processes which will timeout
712 for i in range(3):
713 p = self.Process(target=self.f,
714 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000715 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 p.start()
717
718 t = threading.Thread(target=self.f,
719 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000720 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 t.start()
722
723 # wait for them all to sleep
724 for i in range(6):
725 sleeping.acquire()
726
727 # check they have all timed out
728 for i in range(6):
729 woken.acquire()
730 self.assertReturnsIfImplemented(0, get_value, woken)
731
732 # check state is not mucked up
733 self.check_invariant(cond)
734
735 # start some more threads/processes
736 for i in range(3):
737 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000738 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000739 p.start()
740
741 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000742 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000743 t.start()
744
745 # wait for them to all sleep
746 for i in range(6):
747 sleeping.acquire()
748
749 # check no process/thread has woken up
750 time.sleep(DELTA)
751 self.assertReturnsIfImplemented(0, get_value, woken)
752
753 # wake them all up
754 cond.acquire()
755 cond.notify_all()
756 cond.release()
757
758 # check they have all woken
759 time.sleep(DELTA)
760 self.assertReturnsIfImplemented(6, get_value, woken)
761
762 # check state is not mucked up
763 self.check_invariant(cond)
764
765 def test_timeout(self):
766 cond = self.Condition()
767 wait = TimingWrapper(cond.wait)
768 cond.acquire()
769 res = wait(TIMEOUT1)
770 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000771 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000772 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
773
774
775class _TestEvent(BaseTestCase):
776
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000777 @classmethod
778 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 time.sleep(TIMEOUT2)
780 event.set()
781
782 def test_event(self):
783 event = self.Event()
784 wait = TimingWrapper(event.wait)
785
786 # Removed temporaily, due to API shear, this does not
787 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789
Benjamin Peterson965ce872009-04-05 21:24:58 +0000790 # Removed, threading.Event.wait() will return the value of the __flag
791 # instead of None. API Shear with the semaphore backed mp.Event
792 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000794 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000795 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
796
797 event.set()
798
799 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(event.is_set(), True)
801 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000803 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
805 # self.assertEqual(event.is_set(), True)
806
807 event.clear()
808
809 #self.assertEqual(event.is_set(), False)
810
811 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000812 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813
814#
815#
816#
817
818class _TestValue(BaseTestCase):
819
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000820 ALLOWED_TYPES = ('processes',)
821
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 codes_values = [
823 ('i', 4343, 24234),
824 ('d', 3.625, -4.25),
825 ('h', -232, 234),
826 ('c', latin('x'), latin('y'))
827 ]
828
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000829 def setUp(self):
830 if not HAS_SHAREDCTYPES:
831 self.skipTest("requires multiprocessing.sharedctypes")
832
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000833 @classmethod
834 def _test(cls, values):
835 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 sv.value = cv[2]
837
838
839 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000840 if raw:
841 values = [self.RawValue(code, value)
842 for code, value, _ in self.codes_values]
843 else:
844 values = [self.Value(code, value)
845 for code, value, _ in self.codes_values]
846
847 for sv, cv in zip(values, self.codes_values):
848 self.assertEqual(sv.value, cv[1])
849
850 proc = self.Process(target=self._test, args=(values,))
851 proc.start()
852 proc.join()
853
854 for sv, cv in zip(values, self.codes_values):
855 self.assertEqual(sv.value, cv[2])
856
857 def test_rawvalue(self):
858 self.test_value(raw=True)
859
860 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 val1 = self.Value('i', 5)
862 lock1 = val1.get_lock()
863 obj1 = val1.get_obj()
864
865 val2 = self.Value('i', 5, lock=None)
866 lock2 = val2.get_lock()
867 obj2 = val2.get_obj()
868
869 lock = self.Lock()
870 val3 = self.Value('i', 5, lock=lock)
871 lock3 = val3.get_lock()
872 obj3 = val3.get_obj()
873 self.assertEqual(lock, lock3)
874
Jesse Nollerb0516a62009-01-18 03:11:38 +0000875 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 self.assertFalse(hasattr(arr4, 'get_lock'))
877 self.assertFalse(hasattr(arr4, 'get_obj'))
878
Jesse Nollerb0516a62009-01-18 03:11:38 +0000879 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
880
881 arr5 = self.RawValue('i', 5)
882 self.assertFalse(hasattr(arr5, 'get_lock'))
883 self.assertFalse(hasattr(arr5, 'get_obj'))
884
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885
886class _TestArray(BaseTestCase):
887
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000888 ALLOWED_TYPES = ('processes',)
889
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000890 @classmethod
891 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 for i in range(1, len(seq)):
893 seq[i] += seq[i-1]
894
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000895 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
898 if raw:
899 arr = self.RawArray('i', seq)
900 else:
901 arr = self.Array('i', seq)
902
903 self.assertEqual(len(arr), len(seq))
904 self.assertEqual(arr[3], seq[3])
905 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
906
907 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
908
909 self.assertEqual(list(arr[:]), seq)
910
911 self.f(seq)
912
913 p = self.Process(target=self.f, args=(arr,))
914 p.start()
915 p.join()
916
917 self.assertEqual(list(arr[:]), seq)
918
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000919 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 def test_rawarray(self):
921 self.test_array(raw=True)
922
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000923 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000924 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000925 arr1 = self.Array('i', list(range(10)))
926 lock1 = arr1.get_lock()
927 obj1 = arr1.get_obj()
928
929 arr2 = self.Array('i', list(range(10)), lock=None)
930 lock2 = arr2.get_lock()
931 obj2 = arr2.get_obj()
932
933 lock = self.Lock()
934 arr3 = self.Array('i', list(range(10)), lock=lock)
935 lock3 = arr3.get_lock()
936 obj3 = arr3.get_obj()
937 self.assertEqual(lock, lock3)
938
Jesse Nollerb0516a62009-01-18 03:11:38 +0000939 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 self.assertFalse(hasattr(arr4, 'get_lock'))
941 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000942 self.assertRaises(AttributeError,
943 self.Array, 'i', range(10), lock='notalock')
944
945 arr5 = self.RawArray('i', range(10))
946 self.assertFalse(hasattr(arr5, 'get_lock'))
947 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000948
949#
950#
951#
952
953class _TestContainers(BaseTestCase):
954
955 ALLOWED_TYPES = ('manager',)
956
957 def test_list(self):
958 a = self.list(list(range(10)))
959 self.assertEqual(a[:], list(range(10)))
960
961 b = self.list()
962 self.assertEqual(b[:], [])
963
964 b.extend(list(range(5)))
965 self.assertEqual(b[:], list(range(5)))
966
967 self.assertEqual(b[2], 2)
968 self.assertEqual(b[2:10], [2,3,4])
969
970 b *= 2
971 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
972
973 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
974
975 self.assertEqual(a[:], list(range(10)))
976
977 d = [a, b]
978 e = self.list(d)
979 self.assertEqual(
980 e[:],
981 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
982 )
983
984 f = self.list([a])
985 a.append('hello')
986 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
987
988 def test_dict(self):
989 d = self.dict()
990 indices = list(range(65, 70))
991 for i in indices:
992 d[i] = chr(i)
993 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
994 self.assertEqual(sorted(d.keys()), indices)
995 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
996 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
997
998 def test_namespace(self):
999 n = self.Namespace()
1000 n.name = 'Bob'
1001 n.job = 'Builder'
1002 n._hidden = 'hidden'
1003 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1004 del n.job
1005 self.assertEqual(str(n), "Namespace(name='Bob')")
1006 self.assertTrue(hasattr(n, 'name'))
1007 self.assertTrue(not hasattr(n, 'job'))
1008
1009#
1010#
1011#
1012
1013def sqr(x, wait=0.0):
1014 time.sleep(wait)
1015 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001016
Benjamin Petersone711caf2008-06-11 16:44:04 +00001017class _TestPool(BaseTestCase):
1018
1019 def test_apply(self):
1020 papply = self.pool.apply
1021 self.assertEqual(papply(sqr, (5,)), sqr(5))
1022 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1023
1024 def test_map(self):
1025 pmap = self.pool.map
1026 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1027 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1028 list(map(sqr, list(range(100)))))
1029
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001030 def test_map_chunksize(self):
1031 try:
1032 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1033 except multiprocessing.TimeoutError:
1034 self.fail("pool.map_async with chunksize stalled on null list")
1035
Benjamin Petersone711caf2008-06-11 16:44:04 +00001036 def test_async(self):
1037 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1038 get = TimingWrapper(res.get)
1039 self.assertEqual(get(), 49)
1040 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1041
1042 def test_async_timeout(self):
1043 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1044 get = TimingWrapper(res.get)
1045 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1046 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1047
1048 def test_imap(self):
1049 it = self.pool.imap(sqr, list(range(10)))
1050 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1051
1052 it = self.pool.imap(sqr, list(range(10)))
1053 for i in range(10):
1054 self.assertEqual(next(it), i*i)
1055 self.assertRaises(StopIteration, it.__next__)
1056
1057 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1058 for i in range(1000):
1059 self.assertEqual(next(it), i*i)
1060 self.assertRaises(StopIteration, it.__next__)
1061
1062 def test_imap_unordered(self):
1063 it = self.pool.imap_unordered(sqr, list(range(1000)))
1064 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1065
1066 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1067 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1068
1069 def test_make_pool(self):
1070 p = multiprocessing.Pool(3)
1071 self.assertEqual(3, len(p._pool))
1072 p.close()
1073 p.join()
1074
1075 def test_terminate(self):
1076 if self.TYPE == 'manager':
1077 # On Unix a forked process increfs each shared object to
1078 # which its parent process held a reference. If the
1079 # forked process gets terminated then there is likely to
1080 # be a reference leak. So to prevent
1081 # _TestZZZNumberOfObjects from failing we skip this test
1082 # when using a manager.
1083 return
1084
1085 result = self.pool.map_async(
1086 time.sleep, [0.1 for i in range(10000)], chunksize=1
1087 )
1088 self.pool.terminate()
1089 join = TimingWrapper(self.pool.join)
1090 join()
1091 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001092
Ask Solem2afcbf22010-11-09 20:55:52 +00001093def raising():
1094 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001095
Ask Solem2afcbf22010-11-09 20:55:52 +00001096def unpickleable_result():
1097 return lambda: 42
1098
1099class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001100 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001101
1102 def test_async_error_callback(self):
1103 p = multiprocessing.Pool(2)
1104
1105 scratchpad = [None]
1106 def errback(exc):
1107 scratchpad[0] = exc
1108
1109 res = p.apply_async(raising, error_callback=errback)
1110 self.assertRaises(KeyError, res.get)
1111 self.assertTrue(scratchpad[0])
1112 self.assertIsInstance(scratchpad[0], KeyError)
1113
1114 p.close()
1115 p.join()
1116
1117 def test_unpickleable_result(self):
1118 from multiprocessing.pool import MaybeEncodingError
1119 p = multiprocessing.Pool(2)
1120
1121 # Make sure we don't lose pool processes because of encoding errors.
1122 for iteration in range(20):
1123
1124 scratchpad = [None]
1125 def errback(exc):
1126 scratchpad[0] = exc
1127
1128 res = p.apply_async(unpickleable_result, error_callback=errback)
1129 self.assertRaises(MaybeEncodingError, res.get)
1130 wrapped = scratchpad[0]
1131 self.assertTrue(wrapped)
1132 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1133 self.assertIsNotNone(wrapped.exc)
1134 self.assertIsNotNone(wrapped.value)
1135
1136 p.close()
1137 p.join()
1138
1139class _TestPoolWorkerLifetime(BaseTestCase):
1140 ALLOWED_TYPES = ('processes', )
1141
Jesse Noller1f0b6582010-01-27 03:36:01 +00001142 def test_pool_worker_lifetime(self):
1143 p = multiprocessing.Pool(3, maxtasksperchild=10)
1144 self.assertEqual(3, len(p._pool))
1145 origworkerpids = [w.pid for w in p._pool]
1146 # Run many tasks so each worker gets replaced (hopefully)
1147 results = []
1148 for i in range(100):
1149 results.append(p.apply_async(sqr, (i, )))
1150 # Fetch the results and verify we got the right answers,
1151 # also ensuring all the tasks have completed.
1152 for (j, res) in enumerate(results):
1153 self.assertEqual(res.get(), sqr(j))
1154 # Refill the pool
1155 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001156 # Wait until all workers are alive
1157 countdown = 5
1158 while countdown and not all(w.is_alive() for w in p._pool):
1159 countdown -= 1
1160 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001161 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001162 # All pids should be assigned. See issue #7805.
1163 self.assertNotIn(None, origworkerpids)
1164 self.assertNotIn(None, finalworkerpids)
1165 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001166 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1167 p.close()
1168 p.join()
1169
Benjamin Petersone711caf2008-06-11 16:44:04 +00001170#
1171# Test that manager has expected number of shared objects left
1172#
1173
1174class _TestZZZNumberOfObjects(BaseTestCase):
1175 # Because test cases are sorted alphabetically, this one will get
1176 # run after all the other tests for the manager. It tests that
1177 # there have been no "reference leaks" for the manager's shared
1178 # objects. Note the comment in _TestPool.test_terminate().
1179 ALLOWED_TYPES = ('manager',)
1180
1181 def test_number_of_objects(self):
1182 EXPECTED_NUMBER = 1 # the pool object is still alive
1183 multiprocessing.active_children() # discard dead process objs
1184 gc.collect() # do garbage collection
1185 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001186 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001187 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001188 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001189 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001190
1191 self.assertEqual(refs, EXPECTED_NUMBER)
1192
1193#
1194# Test of creating a customized manager class
1195#
1196
1197from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1198
1199class FooBar(object):
1200 def f(self):
1201 return 'f()'
1202 def g(self):
1203 raise ValueError
1204 def _h(self):
1205 return '_h()'
1206
1207def baz():
1208 for i in range(10):
1209 yield i*i
1210
1211class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001212 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001213 def __iter__(self):
1214 return self
1215 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001216 return self._callmethod('__next__')
1217
1218class MyManager(BaseManager):
1219 pass
1220
1221MyManager.register('Foo', callable=FooBar)
1222MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1223MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1224
1225
1226class _TestMyManager(BaseTestCase):
1227
1228 ALLOWED_TYPES = ('manager',)
1229
1230 def test_mymanager(self):
1231 manager = MyManager()
1232 manager.start()
1233
1234 foo = manager.Foo()
1235 bar = manager.Bar()
1236 baz = manager.baz()
1237
1238 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1239 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1240
1241 self.assertEqual(foo_methods, ['f', 'g'])
1242 self.assertEqual(bar_methods, ['f', '_h'])
1243
1244 self.assertEqual(foo.f(), 'f()')
1245 self.assertRaises(ValueError, foo.g)
1246 self.assertEqual(foo._callmethod('f'), 'f()')
1247 self.assertRaises(RemoteError, foo._callmethod, '_h')
1248
1249 self.assertEqual(bar.f(), 'f()')
1250 self.assertEqual(bar._h(), '_h()')
1251 self.assertEqual(bar._callmethod('f'), 'f()')
1252 self.assertEqual(bar._callmethod('_h'), '_h()')
1253
1254 self.assertEqual(list(baz), [i*i for i in range(10)])
1255
1256 manager.shutdown()
1257
1258#
1259# Test of connecting to a remote server and using xmlrpclib for serialization
1260#
1261
1262_queue = pyqueue.Queue()
1263def get_queue():
1264 return _queue
1265
1266class QueueManager(BaseManager):
1267 '''manager class used by server process'''
1268QueueManager.register('get_queue', callable=get_queue)
1269
1270class QueueManager2(BaseManager):
1271 '''manager class which specifies the same interface as QueueManager'''
1272QueueManager2.register('get_queue')
1273
1274
1275SERIALIZER = 'xmlrpclib'
1276
1277class _TestRemoteManager(BaseTestCase):
1278
1279 ALLOWED_TYPES = ('manager',)
1280
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001281 @classmethod
1282 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001283 manager = QueueManager2(
1284 address=address, authkey=authkey, serializer=SERIALIZER
1285 )
1286 manager.connect()
1287 queue = manager.get_queue()
1288 queue.put(('hello world', None, True, 2.25))
1289
1290 def test_remote(self):
1291 authkey = os.urandom(32)
1292
1293 manager = QueueManager(
1294 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1295 )
1296 manager.start()
1297
1298 p = self.Process(target=self._putter, args=(manager.address, authkey))
1299 p.start()
1300
1301 manager2 = QueueManager2(
1302 address=manager.address, authkey=authkey, serializer=SERIALIZER
1303 )
1304 manager2.connect()
1305 queue = manager2.get_queue()
1306
1307 # Note that xmlrpclib will deserialize object as a list not a tuple
1308 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1309
1310 # Because we are using xmlrpclib for serialization instead of
1311 # pickle this will cause a serialization error.
1312 self.assertRaises(Exception, queue.put, time.sleep)
1313
1314 # Make queue finalizer run before the server is stopped
1315 del queue
1316 manager.shutdown()
1317
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001318class _TestManagerRestart(BaseTestCase):
1319
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001320 @classmethod
1321 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001322 manager = QueueManager(
1323 address=address, authkey=authkey, serializer=SERIALIZER)
1324 manager.connect()
1325 queue = manager.get_queue()
1326 queue.put('hello world')
1327
1328 def test_rapid_restart(self):
1329 authkey = os.urandom(32)
1330 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001331 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001332 srvr = manager.get_server()
1333 addr = srvr.address
1334 # Close the connection.Listener socket which gets opened as a part
1335 # of manager.get_server(). It's not needed for the test.
1336 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001337 manager.start()
1338
1339 p = self.Process(target=self._putter, args=(manager.address, authkey))
1340 p.start()
1341 queue = manager.get_queue()
1342 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001343 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001344 manager.shutdown()
1345 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001346 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001347 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001348 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001349
Benjamin Petersone711caf2008-06-11 16:44:04 +00001350#
1351#
1352#
1353
1354SENTINEL = latin('')
1355
1356class _TestConnection(BaseTestCase):
1357
1358 ALLOWED_TYPES = ('processes', 'threads')
1359
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001360 @classmethod
1361 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001362 for msg in iter(conn.recv_bytes, SENTINEL):
1363 conn.send_bytes(msg)
1364 conn.close()
1365
1366 def test_connection(self):
1367 conn, child_conn = self.Pipe()
1368
1369 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001370 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001371 p.start()
1372
1373 seq = [1, 2.25, None]
1374 msg = latin('hello world')
1375 longmsg = msg * 10
1376 arr = array.array('i', list(range(4)))
1377
1378 if self.TYPE == 'processes':
1379 self.assertEqual(type(conn.fileno()), int)
1380
1381 self.assertEqual(conn.send(seq), None)
1382 self.assertEqual(conn.recv(), seq)
1383
1384 self.assertEqual(conn.send_bytes(msg), None)
1385 self.assertEqual(conn.recv_bytes(), msg)
1386
1387 if self.TYPE == 'processes':
1388 buffer = array.array('i', [0]*10)
1389 expected = list(arr) + [0] * (10 - len(arr))
1390 self.assertEqual(conn.send_bytes(arr), None)
1391 self.assertEqual(conn.recv_bytes_into(buffer),
1392 len(arr) * buffer.itemsize)
1393 self.assertEqual(list(buffer), expected)
1394
1395 buffer = array.array('i', [0]*10)
1396 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1397 self.assertEqual(conn.send_bytes(arr), None)
1398 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1399 len(arr) * buffer.itemsize)
1400 self.assertEqual(list(buffer), expected)
1401
1402 buffer = bytearray(latin(' ' * 40))
1403 self.assertEqual(conn.send_bytes(longmsg), None)
1404 try:
1405 res = conn.recv_bytes_into(buffer)
1406 except multiprocessing.BufferTooShort as e:
1407 self.assertEqual(e.args, (longmsg,))
1408 else:
1409 self.fail('expected BufferTooShort, got %s' % res)
1410
1411 poll = TimingWrapper(conn.poll)
1412
1413 self.assertEqual(poll(), False)
1414 self.assertTimingAlmostEqual(poll.elapsed, 0)
1415
1416 self.assertEqual(poll(TIMEOUT1), False)
1417 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1418
1419 conn.send(None)
1420
1421 self.assertEqual(poll(TIMEOUT1), True)
1422 self.assertTimingAlmostEqual(poll.elapsed, 0)
1423
1424 self.assertEqual(conn.recv(), None)
1425
1426 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1427 conn.send_bytes(really_big_msg)
1428 self.assertEqual(conn.recv_bytes(), really_big_msg)
1429
1430 conn.send_bytes(SENTINEL) # tell child to quit
1431 child_conn.close()
1432
1433 if self.TYPE == 'processes':
1434 self.assertEqual(conn.readable, True)
1435 self.assertEqual(conn.writable, True)
1436 self.assertRaises(EOFError, conn.recv)
1437 self.assertRaises(EOFError, conn.recv_bytes)
1438
1439 p.join()
1440
1441 def test_duplex_false(self):
1442 reader, writer = self.Pipe(duplex=False)
1443 self.assertEqual(writer.send(1), None)
1444 self.assertEqual(reader.recv(), 1)
1445 if self.TYPE == 'processes':
1446 self.assertEqual(reader.readable, True)
1447 self.assertEqual(reader.writable, False)
1448 self.assertEqual(writer.readable, False)
1449 self.assertEqual(writer.writable, True)
1450 self.assertRaises(IOError, reader.send, 2)
1451 self.assertRaises(IOError, writer.recv)
1452 self.assertRaises(IOError, writer.poll)
1453
1454 def test_spawn_close(self):
1455 # We test that a pipe connection can be closed by parent
1456 # process immediately after child is spawned. On Windows this
1457 # would have sometimes failed on old versions because
1458 # child_conn would be closed before the child got a chance to
1459 # duplicate it.
1460 conn, child_conn = self.Pipe()
1461
1462 p = self.Process(target=self._echo, args=(child_conn,))
1463 p.start()
1464 child_conn.close() # this might complete before child initializes
1465
1466 msg = latin('hello')
1467 conn.send_bytes(msg)
1468 self.assertEqual(conn.recv_bytes(), msg)
1469
1470 conn.send_bytes(SENTINEL)
1471 conn.close()
1472 p.join()
1473
1474 def test_sendbytes(self):
1475 if self.TYPE != 'processes':
1476 return
1477
1478 msg = latin('abcdefghijklmnopqrstuvwxyz')
1479 a, b = self.Pipe()
1480
1481 a.send_bytes(msg)
1482 self.assertEqual(b.recv_bytes(), msg)
1483
1484 a.send_bytes(msg, 5)
1485 self.assertEqual(b.recv_bytes(), msg[5:])
1486
1487 a.send_bytes(msg, 7, 8)
1488 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1489
1490 a.send_bytes(msg, 26)
1491 self.assertEqual(b.recv_bytes(), latin(''))
1492
1493 a.send_bytes(msg, 26, 0)
1494 self.assertEqual(b.recv_bytes(), latin(''))
1495
1496 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1497
1498 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1499
1500 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1501
1502 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1503
1504 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1505
Benjamin Petersone711caf2008-06-11 16:44:04 +00001506class _TestListenerClient(BaseTestCase):
1507
1508 ALLOWED_TYPES = ('processes', 'threads')
1509
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001510 @classmethod
1511 def _test(cls, address):
1512 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001513 conn.send('hello')
1514 conn.close()
1515
1516 def test_listener_client(self):
1517 for family in self.connection.families:
1518 l = self.connection.Listener(family=family)
1519 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001520 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001521 p.start()
1522 conn = l.accept()
1523 self.assertEqual(conn.recv(), 'hello')
1524 p.join()
1525 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001526#
1527# Test of sending connection and socket objects between processes
1528#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001529"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001530class _TestPicklingConnections(BaseTestCase):
1531
1532 ALLOWED_TYPES = ('processes',)
1533
1534 def _listener(self, conn, families):
1535 for fam in families:
1536 l = self.connection.Listener(family=fam)
1537 conn.send(l.address)
1538 new_conn = l.accept()
1539 conn.send(new_conn)
1540
1541 if self.TYPE == 'processes':
1542 l = socket.socket()
1543 l.bind(('localhost', 0))
1544 conn.send(l.getsockname())
1545 l.listen(1)
1546 new_conn, addr = l.accept()
1547 conn.send(new_conn)
1548
1549 conn.recv()
1550
1551 def _remote(self, conn):
1552 for (address, msg) in iter(conn.recv, None):
1553 client = self.connection.Client(address)
1554 client.send(msg.upper())
1555 client.close()
1556
1557 if self.TYPE == 'processes':
1558 address, msg = conn.recv()
1559 client = socket.socket()
1560 client.connect(address)
1561 client.sendall(msg.upper())
1562 client.close()
1563
1564 conn.close()
1565
1566 def test_pickling(self):
1567 try:
1568 multiprocessing.allow_connection_pickling()
1569 except ImportError:
1570 return
1571
1572 families = self.connection.families
1573
1574 lconn, lconn0 = self.Pipe()
1575 lp = self.Process(target=self._listener, args=(lconn0, families))
1576 lp.start()
1577 lconn0.close()
1578
1579 rconn, rconn0 = self.Pipe()
1580 rp = self.Process(target=self._remote, args=(rconn0,))
1581 rp.start()
1582 rconn0.close()
1583
1584 for fam in families:
1585 msg = ('This connection uses family %s' % fam).encode('ascii')
1586 address = lconn.recv()
1587 rconn.send((address, msg))
1588 new_conn = lconn.recv()
1589 self.assertEqual(new_conn.recv(), msg.upper())
1590
1591 rconn.send(None)
1592
1593 if self.TYPE == 'processes':
1594 msg = latin('This connection uses a normal socket')
1595 address = lconn.recv()
1596 rconn.send((address, msg))
1597 if hasattr(socket, 'fromfd'):
1598 new_conn = lconn.recv()
1599 self.assertEqual(new_conn.recv(100), msg.upper())
1600 else:
1601 # XXX On Windows with Py2.6 need to backport fromfd()
1602 discard = lconn.recv_bytes()
1603
1604 lconn.send(None)
1605
1606 rconn.close()
1607 lconn.close()
1608
1609 lp.join()
1610 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001611"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001612#
1613#
1614#
1615
1616class _TestHeap(BaseTestCase):
1617
1618 ALLOWED_TYPES = ('processes',)
1619
1620 def test_heap(self):
1621 iterations = 5000
1622 maxblocks = 50
1623 blocks = []
1624
1625 # create and destroy lots of blocks of different sizes
1626 for i in range(iterations):
1627 size = int(random.lognormvariate(0, 1) * 1000)
1628 b = multiprocessing.heap.BufferWrapper(size)
1629 blocks.append(b)
1630 if len(blocks) > maxblocks:
1631 i = random.randrange(maxblocks)
1632 del blocks[i]
1633
1634 # get the heap object
1635 heap = multiprocessing.heap.BufferWrapper._heap
1636
1637 # verify the state of the heap
1638 all = []
1639 occupied = 0
1640 for L in list(heap._len_to_seq.values()):
1641 for arena, start, stop in L:
1642 all.append((heap._arenas.index(arena), start, stop,
1643 stop-start, 'free'))
1644 for arena, start, stop in heap._allocated_blocks:
1645 all.append((heap._arenas.index(arena), start, stop,
1646 stop-start, 'occupied'))
1647 occupied += (stop-start)
1648
1649 all.sort()
1650
1651 for i in range(len(all)-1):
1652 (arena, start, stop) = all[i][:3]
1653 (narena, nstart, nstop) = all[i+1][:3]
1654 self.assertTrue((arena != narena and nstart == 0) or
1655 (stop == nstart))
1656
1657#
1658#
1659#
1660
Benjamin Petersone711caf2008-06-11 16:44:04 +00001661class _Foo(Structure):
1662 _fields_ = [
1663 ('x', c_int),
1664 ('y', c_double)
1665 ]
1666
1667class _TestSharedCTypes(BaseTestCase):
1668
1669 ALLOWED_TYPES = ('processes',)
1670
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001671 def setUp(self):
1672 if not HAS_SHAREDCTYPES:
1673 self.skipTest("requires multiprocessing.sharedctypes")
1674
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001675 @classmethod
1676 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001677 x.value *= 2
1678 y.value *= 2
1679 foo.x *= 2
1680 foo.y *= 2
1681 string.value *= 2
1682 for i in range(len(arr)):
1683 arr[i] *= 2
1684
1685 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001686 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001687 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001688 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001689 arr = self.Array('d', list(range(10)), lock=lock)
1690 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001691 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001692
1693 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1694 p.start()
1695 p.join()
1696
1697 self.assertEqual(x.value, 14)
1698 self.assertAlmostEqual(y.value, 2.0/3.0)
1699 self.assertEqual(foo.x, 6)
1700 self.assertAlmostEqual(foo.y, 4.0)
1701 for i in range(10):
1702 self.assertAlmostEqual(arr[i], i*2)
1703 self.assertEqual(string.value, latin('hellohello'))
1704
1705 def test_synchronize(self):
1706 self.test_sharedctypes(lock=True)
1707
1708 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001709 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001710 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001711 foo.x = 0
1712 foo.y = 0
1713 self.assertEqual(bar.x, 2)
1714 self.assertAlmostEqual(bar.y, 5.0)
1715
1716#
1717#
1718#
1719
1720class _TestFinalize(BaseTestCase):
1721
1722 ALLOWED_TYPES = ('processes',)
1723
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001724 @classmethod
1725 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001726 class Foo(object):
1727 pass
1728
1729 a = Foo()
1730 util.Finalize(a, conn.send, args=('a',))
1731 del a # triggers callback for a
1732
1733 b = Foo()
1734 close_b = util.Finalize(b, conn.send, args=('b',))
1735 close_b() # triggers callback for b
1736 close_b() # does nothing because callback has already been called
1737 del b # does nothing because callback has already been called
1738
1739 c = Foo()
1740 util.Finalize(c, conn.send, args=('c',))
1741
1742 d10 = Foo()
1743 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1744
1745 d01 = Foo()
1746 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1747 d02 = Foo()
1748 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1749 d03 = Foo()
1750 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1751
1752 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1753
1754 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1755
1756 # call mutliprocessing's cleanup function then exit process without
1757 # garbage collecting locals
1758 util._exit_function()
1759 conn.close()
1760 os._exit(0)
1761
1762 def test_finalize(self):
1763 conn, child_conn = self.Pipe()
1764
1765 p = self.Process(target=self._test_finalize, args=(child_conn,))
1766 p.start()
1767 p.join()
1768
1769 result = [obj for obj in iter(conn.recv, 'STOP')]
1770 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1771
1772#
1773# Test that from ... import * works for each module
1774#
1775
1776class _TestImportStar(BaseTestCase):
1777
1778 ALLOWED_TYPES = ('processes',)
1779
1780 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001781 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001782 'multiprocessing', 'multiprocessing.connection',
1783 'multiprocessing.heap', 'multiprocessing.managers',
1784 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001785 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001786 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001787 ]
1788
1789 if c_int is not None:
1790 # This module requires _ctypes
1791 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001792
1793 for name in modules:
1794 __import__(name)
1795 mod = sys.modules[name]
1796
1797 for attr in getattr(mod, '__all__', ()):
1798 self.assertTrue(
1799 hasattr(mod, attr),
1800 '%r does not have attribute %r' % (mod, attr)
1801 )
1802
1803#
1804# Quick test that logging works -- does not test logging output
1805#
1806
1807class _TestLogging(BaseTestCase):
1808
1809 ALLOWED_TYPES = ('processes',)
1810
1811 def test_enable_logging(self):
1812 logger = multiprocessing.get_logger()
1813 logger.setLevel(util.SUBWARNING)
1814 self.assertTrue(logger is not None)
1815 logger.debug('this will not be printed')
1816 logger.info('nor will this')
1817 logger.setLevel(LOG_LEVEL)
1818
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001819 @classmethod
1820 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001821 logger = multiprocessing.get_logger()
1822 conn.send(logger.getEffectiveLevel())
1823
1824 def test_level(self):
1825 LEVEL1 = 32
1826 LEVEL2 = 37
1827
1828 logger = multiprocessing.get_logger()
1829 root_logger = logging.getLogger()
1830 root_level = root_logger.level
1831
1832 reader, writer = multiprocessing.Pipe(duplex=False)
1833
1834 logger.setLevel(LEVEL1)
1835 self.Process(target=self._test_level, args=(writer,)).start()
1836 self.assertEqual(LEVEL1, reader.recv())
1837
1838 logger.setLevel(logging.NOTSET)
1839 root_logger.setLevel(LEVEL2)
1840 self.Process(target=self._test_level, args=(writer,)).start()
1841 self.assertEqual(LEVEL2, reader.recv())
1842
1843 root_logger.setLevel(root_level)
1844 logger.setLevel(level=LOG_LEVEL)
1845
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001846
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001847# class _TestLoggingProcessName(BaseTestCase):
1848#
1849# def handle(self, record):
1850# assert record.processName == multiprocessing.current_process().name
1851# self.__handled = True
1852#
1853# def test_logging(self):
1854# handler = logging.Handler()
1855# handler.handle = self.handle
1856# self.__handled = False
1857# # Bypass getLogger() and side-effects
1858# logger = logging.getLoggerClass()(
1859# 'multiprocessing.test.TestLoggingProcessName')
1860# logger.addHandler(handler)
1861# logger.propagate = False
1862#
1863# logger.warn('foo')
1864# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001865
Benjamin Petersone711caf2008-06-11 16:44:04 +00001866#
Jesse Noller6214edd2009-01-19 16:23:53 +00001867# Test to verify handle verification, see issue 3321
1868#
1869
1870class TestInvalidHandle(unittest.TestCase):
1871
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001872 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001873 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001874 conn = _multiprocessing.Connection(44977608)
1875 self.assertRaises(IOError, conn.poll)
1876 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001877
Jesse Noller6214edd2009-01-19 16:23:53 +00001878#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001879# Functions used to create test cases from the base ones in this module
1880#
1881
1882def get_attributes(Source, names):
1883 d = {}
1884 for name in names:
1885 obj = getattr(Source, name)
1886 if type(obj) == type(get_attributes):
1887 obj = staticmethod(obj)
1888 d[name] = obj
1889 return d
1890
1891def create_test_cases(Mixin, type):
1892 result = {}
1893 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001894 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001895
1896 for name in list(glob.keys()):
1897 if name.startswith('_Test'):
1898 base = glob[name]
1899 if type in base.ALLOWED_TYPES:
1900 newname = 'With' + Type + name[1:]
1901 class Temp(base, unittest.TestCase, Mixin):
1902 pass
1903 result[newname] = Temp
1904 Temp.__name__ = newname
1905 Temp.__module__ = Mixin.__module__
1906 return result
1907
1908#
1909# Create test cases
1910#
1911
1912class ProcessesMixin(object):
1913 TYPE = 'processes'
1914 Process = multiprocessing.Process
1915 locals().update(get_attributes(multiprocessing, (
1916 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1917 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1918 'RawArray', 'current_process', 'active_children', 'Pipe',
1919 'connection', 'JoinableQueue'
1920 )))
1921
1922testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1923globals().update(testcases_processes)
1924
1925
1926class ManagerMixin(object):
1927 TYPE = 'manager'
1928 Process = multiprocessing.Process
1929 manager = object.__new__(multiprocessing.managers.SyncManager)
1930 locals().update(get_attributes(manager, (
1931 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1932 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1933 'Namespace', 'JoinableQueue'
1934 )))
1935
1936testcases_manager = create_test_cases(ManagerMixin, type='manager')
1937globals().update(testcases_manager)
1938
1939
1940class ThreadsMixin(object):
1941 TYPE = 'threads'
1942 Process = multiprocessing.dummy.Process
1943 locals().update(get_attributes(multiprocessing.dummy, (
1944 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1945 'Condition', 'Event', 'Value', 'Array', 'current_process',
1946 'active_children', 'Pipe', 'connection', 'dict', 'list',
1947 'Namespace', 'JoinableQueue'
1948 )))
1949
1950testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1951globals().update(testcases_threads)
1952
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001953class OtherTest(unittest.TestCase):
1954 # TODO: add more tests for deliver/answer challenge.
1955 def test_deliver_challenge_auth_failure(self):
1956 class _FakeConnection(object):
1957 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001958 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001959 def send_bytes(self, data):
1960 pass
1961 self.assertRaises(multiprocessing.AuthenticationError,
1962 multiprocessing.connection.deliver_challenge,
1963 _FakeConnection(), b'abc')
1964
1965 def test_answer_challenge_auth_failure(self):
1966 class _FakeConnection(object):
1967 def __init__(self):
1968 self.count = 0
1969 def recv_bytes(self, size):
1970 self.count += 1
1971 if self.count == 1:
1972 return multiprocessing.connection.CHALLENGE
1973 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001974 return b'something bogus'
1975 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001976 def send_bytes(self, data):
1977 pass
1978 self.assertRaises(multiprocessing.AuthenticationError,
1979 multiprocessing.connection.answer_challenge,
1980 _FakeConnection(), b'abc')
1981
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001982#
1983# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1984#
1985
1986def initializer(ns):
1987 ns.test += 1
1988
1989class TestInitializers(unittest.TestCase):
1990 def setUp(self):
1991 self.mgr = multiprocessing.Manager()
1992 self.ns = self.mgr.Namespace()
1993 self.ns.test = 0
1994
1995 def tearDown(self):
1996 self.mgr.shutdown()
1997
1998 def test_manager_initializer(self):
1999 m = multiprocessing.managers.SyncManager()
2000 self.assertRaises(TypeError, m.start, 1)
2001 m.start(initializer, (self.ns,))
2002 self.assertEqual(self.ns.test, 1)
2003 m.shutdown()
2004
2005 def test_pool_initializer(self):
2006 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2007 p = multiprocessing.Pool(1, initializer, (self.ns,))
2008 p.close()
2009 p.join()
2010 self.assertEqual(self.ns.test, 1)
2011
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002012#
2013# Issue 5155, 5313, 5331: Test process in processes
2014# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2015#
2016
2017def _ThisSubProcess(q):
2018 try:
2019 item = q.get(block=False)
2020 except pyqueue.Empty:
2021 pass
2022
2023def _TestProcess(q):
2024 queue = multiprocessing.Queue()
2025 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2026 subProc.start()
2027 subProc.join()
2028
2029def _afunc(x):
2030 return x*x
2031
2032def pool_in_process():
2033 pool = multiprocessing.Pool(processes=4)
2034 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2035
2036class _file_like(object):
2037 def __init__(self, delegate):
2038 self._delegate = delegate
2039 self._pid = None
2040
2041 @property
2042 def cache(self):
2043 pid = os.getpid()
2044 # There are no race conditions since fork keeps only the running thread
2045 if pid != self._pid:
2046 self._pid = pid
2047 self._cache = []
2048 return self._cache
2049
2050 def write(self, data):
2051 self.cache.append(data)
2052
2053 def flush(self):
2054 self._delegate.write(''.join(self.cache))
2055 self._cache = []
2056
2057class TestStdinBadfiledescriptor(unittest.TestCase):
2058
2059 def test_queue_in_process(self):
2060 queue = multiprocessing.Queue()
2061 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2062 proc.start()
2063 proc.join()
2064
2065 def test_pool_in_process(self):
2066 p = multiprocessing.Process(target=pool_in_process)
2067 p.start()
2068 p.join()
2069
2070 def test_flushing(self):
2071 sio = io.StringIO()
2072 flike = _file_like(sio)
2073 flike.write('foo')
2074 proc = multiprocessing.Process(target=lambda: flike.flush())
2075 flike.flush()
2076 assert sio.getvalue() == 'foo'
2077
2078testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2079 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002080
Benjamin Petersone711caf2008-06-11 16:44:04 +00002081#
2082#
2083#
2084
2085def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002086 if sys.platform.startswith("linux"):
2087 try:
2088 lock = multiprocessing.RLock()
2089 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002090 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002091
Benjamin Petersone711caf2008-06-11 16:44:04 +00002092 if run is None:
2093 from test.support import run_unittest as run
2094
2095 util.get_temp_dir() # creates temp directory for use by all processes
2096
2097 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2098
Benjamin Peterson41181742008-07-02 20:22:54 +00002099 ProcessesMixin.pool = multiprocessing.Pool(4)
2100 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2101 ManagerMixin.manager.__init__()
2102 ManagerMixin.manager.start()
2103 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002104
2105 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002106 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2107 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002108 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2109 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002110 )
2111
2112 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2113 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2114 run(suite)
2115
Benjamin Peterson41181742008-07-02 20:22:54 +00002116 ThreadsMixin.pool.terminate()
2117 ProcessesMixin.pool.terminate()
2118 ManagerMixin.pool.terminate()
2119 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002120
Benjamin Peterson41181742008-07-02 20:22:54 +00002121 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002122
2123def main():
2124 test_main(unittest.TextTestRunner(verbosity=2).run)
2125
2126if __name__ == '__main__':
2127 main()