blob: 405fbd525693b234b6fc268b58b5bf4e41c0dc23 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020074if WIN32:
75 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
76
77 def wait_for_handle(handle, timeout):
78 if timeout is None or timeout < 0.0:
79 timeout = INFINITE
80 else:
81 timeout = int(1000 * timeout)
82 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
83else:
84 from select import select
85 _select = util._eintr_retry(select)
86
87 def wait_for_handle(handle, timeout):
88 if timeout is not None and timeout < 0.0:
89 timeout = None
90 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +000091
Benjamin Petersone711caf2008-06-11 16:44:04 +000092#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000093# Some tests require ctypes
94#
95
96try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000097 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000098except ImportError:
99 Structure = object
100 c_int = c_double = None
101
102#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103# Creates a wrapper for a function which records the time it takes to finish
104#
105
106class TimingWrapper(object):
107
108 def __init__(self, func):
109 self.func = func
110 self.elapsed = None
111
112 def __call__(self, *args, **kwds):
113 t = time.time()
114 try:
115 return self.func(*args, **kwds)
116 finally:
117 self.elapsed = time.time() - t
118
119#
120# Base class for test cases
121#
122
123class BaseTestCase(object):
124
125 ALLOWED_TYPES = ('processes', 'manager', 'threads')
126
127 def assertTimingAlmostEqual(self, a, b):
128 if CHECK_TIMINGS:
129 self.assertAlmostEqual(a, b, 1)
130
131 def assertReturnsIfImplemented(self, value, func, *args):
132 try:
133 res = func(*args)
134 except NotImplementedError:
135 pass
136 else:
137 return self.assertEqual(value, res)
138
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000139 # For the sanity of Windows users, rather than crashing or freezing in
140 # multiple ways.
141 def __reduce__(self, *args):
142 raise NotImplementedError("shouldn't try to pickle a test case")
143
144 __reduce_ex__ = __reduce__
145
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146#
147# Return the value of a semaphore
148#
149
150def get_value(self):
151 try:
152 return self.get_value()
153 except AttributeError:
154 try:
155 return self._Semaphore__value
156 except AttributeError:
157 try:
158 return self._value
159 except AttributeError:
160 raise NotImplementedError
161
162#
163# Testcases
164#
165
166class _TestProcess(BaseTestCase):
167
168 ALLOWED_TYPES = ('processes', 'threads')
169
170 def test_current(self):
171 if self.TYPE == 'threads':
172 return
173
174 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000175 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176
177 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000179 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 self.assertEqual(current.ident, os.getpid())
182 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000184 def test_daemon_argument(self):
185 if self.TYPE == "threads":
186 return
187
188 # By default uses the current process's daemon flag.
189 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000190 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000191 proc1 = self.Process(target=self._test, daemon=True)
192 self.assertTrue(proc1.daemon)
193 proc2 = self.Process(target=self._test, daemon=False)
194 self.assertFalse(proc2.daemon)
195
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 q.put(args)
200 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000202 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 p.start()
227
Ezio Melottib3aedd42010-11-20 19:04:17 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
Ezio Melottib3aedd42010-11-20 19:04:17 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
239 p.join()
240
Ezio Melottib3aedd42010-11-20 19:04:17 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
251 return
252
253 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000254 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269
270 p.join()
271
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288
289 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291
292 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000294
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000295 @classmethod
296 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 from multiprocessing import forking
298 wconn.send(id)
299 if len(id) < 2:
300 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000301 p = cls.Process(
302 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000303 )
304 p.start()
305 p.join()
306
307 def test_recursion(self):
308 rconn, wconn = self.Pipe(duplex=False)
309 self._test_recursion(wconn, [])
310
311 time.sleep(DELTA)
312 result = []
313 while rconn.poll():
314 result.append(rconn.recv())
315
316 expected = [
317 [],
318 [0],
319 [0, 0],
320 [0, 1],
321 [1],
322 [1, 0],
323 [1, 1]
324 ]
325 self.assertEqual(result, expected)
326
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200327 @classmethod
328 def _test_sentinel(cls, event):
329 event.wait(10.0)
330
331 def test_sentinel(self):
332 if self.TYPE == "threads":
333 return
334 event = self.Event()
335 p = self.Process(target=self._test_sentinel, args=(event,))
336 with self.assertRaises(ValueError):
337 p.sentinel
338 p.start()
339 self.addCleanup(p.join)
340 sentinel = p.sentinel
341 self.assertIsInstance(sentinel, int)
342 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
343 event.set()
344 p.join()
345 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
346
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347#
348#
349#
350
351class _UpperCaser(multiprocessing.Process):
352
353 def __init__(self):
354 multiprocessing.Process.__init__(self)
355 self.child_conn, self.parent_conn = multiprocessing.Pipe()
356
357 def run(self):
358 self.parent_conn.close()
359 for s in iter(self.child_conn.recv, None):
360 self.child_conn.send(s.upper())
361 self.child_conn.close()
362
363 def submit(self, s):
364 assert type(s) is str
365 self.parent_conn.send(s)
366 return self.parent_conn.recv()
367
368 def stop(self):
369 self.parent_conn.send(None)
370 self.parent_conn.close()
371 self.child_conn.close()
372
373class _TestSubclassingProcess(BaseTestCase):
374
375 ALLOWED_TYPES = ('processes',)
376
377 def test_subclassing(self):
378 uppercaser = _UpperCaser()
379 uppercaser.start()
380 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
381 self.assertEqual(uppercaser.submit('world'), 'WORLD')
382 uppercaser.stop()
383 uppercaser.join()
384
385#
386#
387#
388
389def queue_empty(q):
390 if hasattr(q, 'empty'):
391 return q.empty()
392 else:
393 return q.qsize() == 0
394
395def queue_full(q, maxsize):
396 if hasattr(q, 'full'):
397 return q.full()
398 else:
399 return q.qsize() == maxsize
400
401
402class _TestQueue(BaseTestCase):
403
404
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000405 @classmethod
406 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000407 child_can_start.wait()
408 for i in range(6):
409 queue.get()
410 parent_can_continue.set()
411
412 def test_put(self):
413 MAXSIZE = 6
414 queue = self.Queue(maxsize=MAXSIZE)
415 child_can_start = self.Event()
416 parent_can_continue = self.Event()
417
418 proc = self.Process(
419 target=self._test_put,
420 args=(queue, child_can_start, parent_can_continue)
421 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000422 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 proc.start()
424
425 self.assertEqual(queue_empty(queue), True)
426 self.assertEqual(queue_full(queue, MAXSIZE), False)
427
428 queue.put(1)
429 queue.put(2, True)
430 queue.put(3, True, None)
431 queue.put(4, False)
432 queue.put(5, False, None)
433 queue.put_nowait(6)
434
435 # the values may be in buffer but not yet in pipe so sleep a bit
436 time.sleep(DELTA)
437
438 self.assertEqual(queue_empty(queue), False)
439 self.assertEqual(queue_full(queue, MAXSIZE), True)
440
441 put = TimingWrapper(queue.put)
442 put_nowait = TimingWrapper(queue.put_nowait)
443
444 self.assertRaises(pyqueue.Full, put, 7, False)
445 self.assertTimingAlmostEqual(put.elapsed, 0)
446
447 self.assertRaises(pyqueue.Full, put, 7, False, None)
448 self.assertTimingAlmostEqual(put.elapsed, 0)
449
450 self.assertRaises(pyqueue.Full, put_nowait, 7)
451 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
452
453 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
454 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
455
456 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
460 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
461
462 child_can_start.set()
463 parent_can_continue.wait()
464
465 self.assertEqual(queue_empty(queue), True)
466 self.assertEqual(queue_full(queue, MAXSIZE), False)
467
468 proc.join()
469
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000470 @classmethod
471 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000472 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000473 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000474 queue.put(2)
475 queue.put(3)
476 queue.put(4)
477 queue.put(5)
478 parent_can_continue.set()
479
480 def test_get(self):
481 queue = self.Queue()
482 child_can_start = self.Event()
483 parent_can_continue = self.Event()
484
485 proc = self.Process(
486 target=self._test_get,
487 args=(queue, child_can_start, parent_can_continue)
488 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000489 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000490 proc.start()
491
492 self.assertEqual(queue_empty(queue), True)
493
494 child_can_start.set()
495 parent_can_continue.wait()
496
497 time.sleep(DELTA)
498 self.assertEqual(queue_empty(queue), False)
499
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000500 # Hangs unexpectedly, remove for now
501 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 self.assertEqual(queue.get(True, None), 2)
503 self.assertEqual(queue.get(True), 3)
504 self.assertEqual(queue.get(timeout=1), 4)
505 self.assertEqual(queue.get_nowait(), 5)
506
507 self.assertEqual(queue_empty(queue), True)
508
509 get = TimingWrapper(queue.get)
510 get_nowait = TimingWrapper(queue.get_nowait)
511
512 self.assertRaises(pyqueue.Empty, get, False)
513 self.assertTimingAlmostEqual(get.elapsed, 0)
514
515 self.assertRaises(pyqueue.Empty, get, False, None)
516 self.assertTimingAlmostEqual(get.elapsed, 0)
517
518 self.assertRaises(pyqueue.Empty, get_nowait)
519 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
520
521 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
522 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
523
524 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
528 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
529
530 proc.join()
531
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000532 @classmethod
533 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 for i in range(10, 20):
535 queue.put(i)
536 # note that at this point the items may only be buffered, so the
537 # process cannot shutdown until the feeder thread has finished
538 # pushing items onto the pipe.
539
540 def test_fork(self):
541 # Old versions of Queue would fail to create a new feeder
542 # thread for a forked process if the original process had its
543 # own feeder thread. This test checks that this no longer
544 # happens.
545
546 queue = self.Queue()
547
548 # put items on queue so that main process starts a feeder thread
549 for i in range(10):
550 queue.put(i)
551
552 # wait to make sure thread starts before we fork a new process
553 time.sleep(DELTA)
554
555 # fork process
556 p = self.Process(target=self._test_fork, args=(queue,))
557 p.start()
558
559 # check that all expected items are in the queue
560 for i in range(20):
561 self.assertEqual(queue.get(), i)
562 self.assertRaises(pyqueue.Empty, queue.get, False)
563
564 p.join()
565
566 def test_qsize(self):
567 q = self.Queue()
568 try:
569 self.assertEqual(q.qsize(), 0)
570 except NotImplementedError:
571 return
572 q.put(1)
573 self.assertEqual(q.qsize(), 1)
574 q.put(5)
575 self.assertEqual(q.qsize(), 2)
576 q.get()
577 self.assertEqual(q.qsize(), 1)
578 q.get()
579 self.assertEqual(q.qsize(), 0)
580
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000581 @classmethod
582 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000583 for obj in iter(q.get, None):
584 time.sleep(DELTA)
585 q.task_done()
586
587 def test_task_done(self):
588 queue = self.JoinableQueue()
589
590 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000591 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592
593 workers = [self.Process(target=self._test_task_done, args=(queue,))
594 for i in range(4)]
595
596 for p in workers:
597 p.start()
598
599 for i in range(10):
600 queue.put(i)
601
602 queue.join()
603
604 for p in workers:
605 queue.put(None)
606
607 for p in workers:
608 p.join()
609
610#
611#
612#
613
614class _TestLock(BaseTestCase):
615
616 def test_lock(self):
617 lock = self.Lock()
618 self.assertEqual(lock.acquire(), True)
619 self.assertEqual(lock.acquire(False), False)
620 self.assertEqual(lock.release(), None)
621 self.assertRaises((ValueError, threading.ThreadError), lock.release)
622
623 def test_rlock(self):
624 lock = self.RLock()
625 self.assertEqual(lock.acquire(), True)
626 self.assertEqual(lock.acquire(), True)
627 self.assertEqual(lock.acquire(), True)
628 self.assertEqual(lock.release(), None)
629 self.assertEqual(lock.release(), None)
630 self.assertEqual(lock.release(), None)
631 self.assertRaises((AssertionError, RuntimeError), lock.release)
632
Jesse Nollerf8d00852009-03-31 03:25:07 +0000633 def test_lock_context(self):
634 with self.Lock():
635 pass
636
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637
638class _TestSemaphore(BaseTestCase):
639
640 def _test_semaphore(self, sem):
641 self.assertReturnsIfImplemented(2, get_value, sem)
642 self.assertEqual(sem.acquire(), True)
643 self.assertReturnsIfImplemented(1, get_value, sem)
644 self.assertEqual(sem.acquire(), True)
645 self.assertReturnsIfImplemented(0, get_value, sem)
646 self.assertEqual(sem.acquire(False), False)
647 self.assertReturnsIfImplemented(0, get_value, sem)
648 self.assertEqual(sem.release(), None)
649 self.assertReturnsIfImplemented(1, get_value, sem)
650 self.assertEqual(sem.release(), None)
651 self.assertReturnsIfImplemented(2, get_value, sem)
652
653 def test_semaphore(self):
654 sem = self.Semaphore(2)
655 self._test_semaphore(sem)
656 self.assertEqual(sem.release(), None)
657 self.assertReturnsIfImplemented(3, get_value, sem)
658 self.assertEqual(sem.release(), None)
659 self.assertReturnsIfImplemented(4, get_value, sem)
660
661 def test_bounded_semaphore(self):
662 sem = self.BoundedSemaphore(2)
663 self._test_semaphore(sem)
664 # Currently fails on OS/X
665 #if HAVE_GETVALUE:
666 # self.assertRaises(ValueError, sem.release)
667 # self.assertReturnsIfImplemented(2, get_value, sem)
668
669 def test_timeout(self):
670 if self.TYPE != 'processes':
671 return
672
673 sem = self.Semaphore(0)
674 acquire = TimingWrapper(sem.acquire)
675
676 self.assertEqual(acquire(False), False)
677 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
678
679 self.assertEqual(acquire(False, None), False)
680 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
681
682 self.assertEqual(acquire(False, TIMEOUT1), False)
683 self.assertTimingAlmostEqual(acquire.elapsed, 0)
684
685 self.assertEqual(acquire(True, TIMEOUT2), False)
686 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
687
688 self.assertEqual(acquire(timeout=TIMEOUT3), False)
689 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
690
691
692class _TestCondition(BaseTestCase):
693
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000694 @classmethod
695 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000696 cond.acquire()
697 sleeping.release()
698 cond.wait(timeout)
699 woken.release()
700 cond.release()
701
702 def check_invariant(self, cond):
703 # this is only supposed to succeed when there are no sleepers
704 if self.TYPE == 'processes':
705 try:
706 sleepers = (cond._sleeping_count.get_value() -
707 cond._woken_count.get_value())
708 self.assertEqual(sleepers, 0)
709 self.assertEqual(cond._wait_semaphore.get_value(), 0)
710 except NotImplementedError:
711 pass
712
713 def test_notify(self):
714 cond = self.Condition()
715 sleeping = self.Semaphore(0)
716 woken = self.Semaphore(0)
717
718 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000719 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 p.start()
721
722 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000723 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000724 p.start()
725
726 # wait for both children to start sleeping
727 sleeping.acquire()
728 sleeping.acquire()
729
730 # check no process/thread has woken up
731 time.sleep(DELTA)
732 self.assertReturnsIfImplemented(0, get_value, woken)
733
734 # wake up one process/thread
735 cond.acquire()
736 cond.notify()
737 cond.release()
738
739 # check one process/thread has woken up
740 time.sleep(DELTA)
741 self.assertReturnsIfImplemented(1, get_value, woken)
742
743 # wake up another
744 cond.acquire()
745 cond.notify()
746 cond.release()
747
748 # check other has woken up
749 time.sleep(DELTA)
750 self.assertReturnsIfImplemented(2, get_value, woken)
751
752 # check state is not mucked up
753 self.check_invariant(cond)
754 p.join()
755
756 def test_notify_all(self):
757 cond = self.Condition()
758 sleeping = self.Semaphore(0)
759 woken = self.Semaphore(0)
760
761 # start some threads/processes which will timeout
762 for i in range(3):
763 p = self.Process(target=self.f,
764 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000765 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000766 p.start()
767
768 t = threading.Thread(target=self.f,
769 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000770 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000771 t.start()
772
773 # wait for them all to sleep
774 for i in range(6):
775 sleeping.acquire()
776
777 # check they have all timed out
778 for i in range(6):
779 woken.acquire()
780 self.assertReturnsIfImplemented(0, get_value, woken)
781
782 # check state is not mucked up
783 self.check_invariant(cond)
784
785 # start some more threads/processes
786 for i in range(3):
787 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000788 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789 p.start()
790
791 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000792 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 t.start()
794
795 # wait for them to all sleep
796 for i in range(6):
797 sleeping.acquire()
798
799 # check no process/thread has woken up
800 time.sleep(DELTA)
801 self.assertReturnsIfImplemented(0, get_value, woken)
802
803 # wake them all up
804 cond.acquire()
805 cond.notify_all()
806 cond.release()
807
808 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200809 for i in range(10):
810 try:
811 if get_value(woken) == 6:
812 break
813 except NotImplementedError:
814 break
815 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 self.assertReturnsIfImplemented(6, get_value, woken)
817
818 # check state is not mucked up
819 self.check_invariant(cond)
820
821 def test_timeout(self):
822 cond = self.Condition()
823 wait = TimingWrapper(cond.wait)
824 cond.acquire()
825 res = wait(TIMEOUT1)
826 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000827 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000828 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
829
830
831class _TestEvent(BaseTestCase):
832
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000833 @classmethod
834 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 time.sleep(TIMEOUT2)
836 event.set()
837
838 def test_event(self):
839 event = self.Event()
840 wait = TimingWrapper(event.wait)
841
Ezio Melotti13925002011-03-16 11:05:33 +0200842 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000843 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000844 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845
Benjamin Peterson965ce872009-04-05 21:24:58 +0000846 # Removed, threading.Event.wait() will return the value of the __flag
847 # instead of None. API Shear with the semaphore backed mp.Event
848 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000850 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000851 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
852
853 event.set()
854
855 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000856 self.assertEqual(event.is_set(), True)
857 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000859 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000860 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
861 # self.assertEqual(event.is_set(), True)
862
863 event.clear()
864
865 #self.assertEqual(event.is_set(), False)
866
867 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000868 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869
870#
871#
872#
873
874class _TestValue(BaseTestCase):
875
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000876 ALLOWED_TYPES = ('processes',)
877
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 codes_values = [
879 ('i', 4343, 24234),
880 ('d', 3.625, -4.25),
881 ('h', -232, 234),
882 ('c', latin('x'), latin('y'))
883 ]
884
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000885 def setUp(self):
886 if not HAS_SHAREDCTYPES:
887 self.skipTest("requires multiprocessing.sharedctypes")
888
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000889 @classmethod
890 def _test(cls, values):
891 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 sv.value = cv[2]
893
894
895 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 if raw:
897 values = [self.RawValue(code, value)
898 for code, value, _ in self.codes_values]
899 else:
900 values = [self.Value(code, value)
901 for code, value, _ in self.codes_values]
902
903 for sv, cv in zip(values, self.codes_values):
904 self.assertEqual(sv.value, cv[1])
905
906 proc = self.Process(target=self._test, args=(values,))
907 proc.start()
908 proc.join()
909
910 for sv, cv in zip(values, self.codes_values):
911 self.assertEqual(sv.value, cv[2])
912
913 def test_rawvalue(self):
914 self.test_value(raw=True)
915
916 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 val1 = self.Value('i', 5)
918 lock1 = val1.get_lock()
919 obj1 = val1.get_obj()
920
921 val2 = self.Value('i', 5, lock=None)
922 lock2 = val2.get_lock()
923 obj2 = val2.get_obj()
924
925 lock = self.Lock()
926 val3 = self.Value('i', 5, lock=lock)
927 lock3 = val3.get_lock()
928 obj3 = val3.get_obj()
929 self.assertEqual(lock, lock3)
930
Jesse Nollerb0516a62009-01-18 03:11:38 +0000931 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000932 self.assertFalse(hasattr(arr4, 'get_lock'))
933 self.assertFalse(hasattr(arr4, 'get_obj'))
934
Jesse Nollerb0516a62009-01-18 03:11:38 +0000935 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
936
937 arr5 = self.RawValue('i', 5)
938 self.assertFalse(hasattr(arr5, 'get_lock'))
939 self.assertFalse(hasattr(arr5, 'get_obj'))
940
Benjamin Petersone711caf2008-06-11 16:44:04 +0000941
942class _TestArray(BaseTestCase):
943
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000944 ALLOWED_TYPES = ('processes',)
945
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000946 @classmethod
947 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000948 for i in range(1, len(seq)):
949 seq[i] += seq[i-1]
950
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000951 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000953 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
954 if raw:
955 arr = self.RawArray('i', seq)
956 else:
957 arr = self.Array('i', seq)
958
959 self.assertEqual(len(arr), len(seq))
960 self.assertEqual(arr[3], seq[3])
961 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
962
963 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
964
965 self.assertEqual(list(arr[:]), seq)
966
967 self.f(seq)
968
969 p = self.Process(target=self.f, args=(arr,))
970 p.start()
971 p.join()
972
973 self.assertEqual(list(arr[:]), seq)
974
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000975 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000976 def test_array_from_size(self):
977 size = 10
978 # Test for zeroing (see issue #11675).
979 # The repetition below strengthens the test by increasing the chances
980 # of previously allocated non-zero memory being used for the new array
981 # on the 2nd and 3rd loops.
982 for _ in range(3):
983 arr = self.Array('i', size)
984 self.assertEqual(len(arr), size)
985 self.assertEqual(list(arr), [0] * size)
986 arr[:] = range(10)
987 self.assertEqual(list(arr), list(range(10)))
988 del arr
989
990 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 def test_rawarray(self):
992 self.test_array(raw=True)
993
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000994 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996 arr1 = self.Array('i', list(range(10)))
997 lock1 = arr1.get_lock()
998 obj1 = arr1.get_obj()
999
1000 arr2 = self.Array('i', list(range(10)), lock=None)
1001 lock2 = arr2.get_lock()
1002 obj2 = arr2.get_obj()
1003
1004 lock = self.Lock()
1005 arr3 = self.Array('i', list(range(10)), lock=lock)
1006 lock3 = arr3.get_lock()
1007 obj3 = arr3.get_obj()
1008 self.assertEqual(lock, lock3)
1009
Jesse Nollerb0516a62009-01-18 03:11:38 +00001010 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011 self.assertFalse(hasattr(arr4, 'get_lock'))
1012 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001013 self.assertRaises(AttributeError,
1014 self.Array, 'i', range(10), lock='notalock')
1015
1016 arr5 = self.RawArray('i', range(10))
1017 self.assertFalse(hasattr(arr5, 'get_lock'))
1018 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001019
1020#
1021#
1022#
1023
1024class _TestContainers(BaseTestCase):
1025
1026 ALLOWED_TYPES = ('manager',)
1027
1028 def test_list(self):
1029 a = self.list(list(range(10)))
1030 self.assertEqual(a[:], list(range(10)))
1031
1032 b = self.list()
1033 self.assertEqual(b[:], [])
1034
1035 b.extend(list(range(5)))
1036 self.assertEqual(b[:], list(range(5)))
1037
1038 self.assertEqual(b[2], 2)
1039 self.assertEqual(b[2:10], [2,3,4])
1040
1041 b *= 2
1042 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1043
1044 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1045
1046 self.assertEqual(a[:], list(range(10)))
1047
1048 d = [a, b]
1049 e = self.list(d)
1050 self.assertEqual(
1051 e[:],
1052 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1053 )
1054
1055 f = self.list([a])
1056 a.append('hello')
1057 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1058
1059 def test_dict(self):
1060 d = self.dict()
1061 indices = list(range(65, 70))
1062 for i in indices:
1063 d[i] = chr(i)
1064 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1065 self.assertEqual(sorted(d.keys()), indices)
1066 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1067 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1068
1069 def test_namespace(self):
1070 n = self.Namespace()
1071 n.name = 'Bob'
1072 n.job = 'Builder'
1073 n._hidden = 'hidden'
1074 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1075 del n.job
1076 self.assertEqual(str(n), "Namespace(name='Bob')")
1077 self.assertTrue(hasattr(n, 'name'))
1078 self.assertTrue(not hasattr(n, 'job'))
1079
1080#
1081#
1082#
1083
1084def sqr(x, wait=0.0):
1085 time.sleep(wait)
1086 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001087
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088class _TestPool(BaseTestCase):
1089
1090 def test_apply(self):
1091 papply = self.pool.apply
1092 self.assertEqual(papply(sqr, (5,)), sqr(5))
1093 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1094
1095 def test_map(self):
1096 pmap = self.pool.map
1097 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1098 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1099 list(map(sqr, list(range(100)))))
1100
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001101 def test_map_chunksize(self):
1102 try:
1103 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1104 except multiprocessing.TimeoutError:
1105 self.fail("pool.map_async with chunksize stalled on null list")
1106
Benjamin Petersone711caf2008-06-11 16:44:04 +00001107 def test_async(self):
1108 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1109 get = TimingWrapper(res.get)
1110 self.assertEqual(get(), 49)
1111 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1112
1113 def test_async_timeout(self):
1114 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1115 get = TimingWrapper(res.get)
1116 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1117 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1118
1119 def test_imap(self):
1120 it = self.pool.imap(sqr, list(range(10)))
1121 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1122
1123 it = self.pool.imap(sqr, list(range(10)))
1124 for i in range(10):
1125 self.assertEqual(next(it), i*i)
1126 self.assertRaises(StopIteration, it.__next__)
1127
1128 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1129 for i in range(1000):
1130 self.assertEqual(next(it), i*i)
1131 self.assertRaises(StopIteration, it.__next__)
1132
1133 def test_imap_unordered(self):
1134 it = self.pool.imap_unordered(sqr, list(range(1000)))
1135 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1136
1137 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1138 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1139
1140 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001141 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1142 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1143
Benjamin Petersone711caf2008-06-11 16:44:04 +00001144 p = multiprocessing.Pool(3)
1145 self.assertEqual(3, len(p._pool))
1146 p.close()
1147 p.join()
1148
1149 def test_terminate(self):
1150 if self.TYPE == 'manager':
1151 # On Unix a forked process increfs each shared object to
1152 # which its parent process held a reference. If the
1153 # forked process gets terminated then there is likely to
1154 # be a reference leak. So to prevent
1155 # _TestZZZNumberOfObjects from failing we skip this test
1156 # when using a manager.
1157 return
1158
1159 result = self.pool.map_async(
1160 time.sleep, [0.1 for i in range(10000)], chunksize=1
1161 )
1162 self.pool.terminate()
1163 join = TimingWrapper(self.pool.join)
1164 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001165 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001166
Ask Solem2afcbf22010-11-09 20:55:52 +00001167def raising():
1168 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001169
Ask Solem2afcbf22010-11-09 20:55:52 +00001170def unpickleable_result():
1171 return lambda: 42
1172
1173class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001174 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001175
1176 def test_async_error_callback(self):
1177 p = multiprocessing.Pool(2)
1178
1179 scratchpad = [None]
1180 def errback(exc):
1181 scratchpad[0] = exc
1182
1183 res = p.apply_async(raising, error_callback=errback)
1184 self.assertRaises(KeyError, res.get)
1185 self.assertTrue(scratchpad[0])
1186 self.assertIsInstance(scratchpad[0], KeyError)
1187
1188 p.close()
1189 p.join()
1190
1191 def test_unpickleable_result(self):
1192 from multiprocessing.pool import MaybeEncodingError
1193 p = multiprocessing.Pool(2)
1194
1195 # Make sure we don't lose pool processes because of encoding errors.
1196 for iteration in range(20):
1197
1198 scratchpad = [None]
1199 def errback(exc):
1200 scratchpad[0] = exc
1201
1202 res = p.apply_async(unpickleable_result, error_callback=errback)
1203 self.assertRaises(MaybeEncodingError, res.get)
1204 wrapped = scratchpad[0]
1205 self.assertTrue(wrapped)
1206 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1207 self.assertIsNotNone(wrapped.exc)
1208 self.assertIsNotNone(wrapped.value)
1209
1210 p.close()
1211 p.join()
1212
1213class _TestPoolWorkerLifetime(BaseTestCase):
1214 ALLOWED_TYPES = ('processes', )
1215
Jesse Noller1f0b6582010-01-27 03:36:01 +00001216 def test_pool_worker_lifetime(self):
1217 p = multiprocessing.Pool(3, maxtasksperchild=10)
1218 self.assertEqual(3, len(p._pool))
1219 origworkerpids = [w.pid for w in p._pool]
1220 # Run many tasks so each worker gets replaced (hopefully)
1221 results = []
1222 for i in range(100):
1223 results.append(p.apply_async(sqr, (i, )))
1224 # Fetch the results and verify we got the right answers,
1225 # also ensuring all the tasks have completed.
1226 for (j, res) in enumerate(results):
1227 self.assertEqual(res.get(), sqr(j))
1228 # Refill the pool
1229 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001230 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001231 # (countdown * DELTA = 5 seconds max startup process time)
1232 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001233 while countdown and not all(w.is_alive() for w in p._pool):
1234 countdown -= 1
1235 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001236 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001237 # All pids should be assigned. See issue #7805.
1238 self.assertNotIn(None, origworkerpids)
1239 self.assertNotIn(None, finalworkerpids)
1240 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001241 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1242 p.close()
1243 p.join()
1244
Benjamin Petersone711caf2008-06-11 16:44:04 +00001245#
1246# Test that manager has expected number of shared objects left
1247#
1248
1249class _TestZZZNumberOfObjects(BaseTestCase):
1250 # Because test cases are sorted alphabetically, this one will get
1251 # run after all the other tests for the manager. It tests that
1252 # there have been no "reference leaks" for the manager's shared
1253 # objects. Note the comment in _TestPool.test_terminate().
1254 ALLOWED_TYPES = ('manager',)
1255
1256 def test_number_of_objects(self):
1257 EXPECTED_NUMBER = 1 # the pool object is still alive
1258 multiprocessing.active_children() # discard dead process objs
1259 gc.collect() # do garbage collection
1260 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001261 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001262 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001263 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001264 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001265
1266 self.assertEqual(refs, EXPECTED_NUMBER)
1267
1268#
1269# Test of creating a customized manager class
1270#
1271
1272from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1273
1274class FooBar(object):
1275 def f(self):
1276 return 'f()'
1277 def g(self):
1278 raise ValueError
1279 def _h(self):
1280 return '_h()'
1281
1282def baz():
1283 for i in range(10):
1284 yield i*i
1285
1286class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001287 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001288 def __iter__(self):
1289 return self
1290 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001291 return self._callmethod('__next__')
1292
1293class MyManager(BaseManager):
1294 pass
1295
1296MyManager.register('Foo', callable=FooBar)
1297MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1298MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1299
1300
1301class _TestMyManager(BaseTestCase):
1302
1303 ALLOWED_TYPES = ('manager',)
1304
1305 def test_mymanager(self):
1306 manager = MyManager()
1307 manager.start()
1308
1309 foo = manager.Foo()
1310 bar = manager.Bar()
1311 baz = manager.baz()
1312
1313 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1314 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1315
1316 self.assertEqual(foo_methods, ['f', 'g'])
1317 self.assertEqual(bar_methods, ['f', '_h'])
1318
1319 self.assertEqual(foo.f(), 'f()')
1320 self.assertRaises(ValueError, foo.g)
1321 self.assertEqual(foo._callmethod('f'), 'f()')
1322 self.assertRaises(RemoteError, foo._callmethod, '_h')
1323
1324 self.assertEqual(bar.f(), 'f()')
1325 self.assertEqual(bar._h(), '_h()')
1326 self.assertEqual(bar._callmethod('f'), 'f()')
1327 self.assertEqual(bar._callmethod('_h'), '_h()')
1328
1329 self.assertEqual(list(baz), [i*i for i in range(10)])
1330
1331 manager.shutdown()
1332
1333#
1334# Test of connecting to a remote server and using xmlrpclib for serialization
1335#
1336
1337_queue = pyqueue.Queue()
1338def get_queue():
1339 return _queue
1340
1341class QueueManager(BaseManager):
1342 '''manager class used by server process'''
1343QueueManager.register('get_queue', callable=get_queue)
1344
1345class QueueManager2(BaseManager):
1346 '''manager class which specifies the same interface as QueueManager'''
1347QueueManager2.register('get_queue')
1348
1349
1350SERIALIZER = 'xmlrpclib'
1351
1352class _TestRemoteManager(BaseTestCase):
1353
1354 ALLOWED_TYPES = ('manager',)
1355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001356 @classmethod
1357 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001358 manager = QueueManager2(
1359 address=address, authkey=authkey, serializer=SERIALIZER
1360 )
1361 manager.connect()
1362 queue = manager.get_queue()
1363 queue.put(('hello world', None, True, 2.25))
1364
1365 def test_remote(self):
1366 authkey = os.urandom(32)
1367
1368 manager = QueueManager(
1369 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1370 )
1371 manager.start()
1372
1373 p = self.Process(target=self._putter, args=(manager.address, authkey))
1374 p.start()
1375
1376 manager2 = QueueManager2(
1377 address=manager.address, authkey=authkey, serializer=SERIALIZER
1378 )
1379 manager2.connect()
1380 queue = manager2.get_queue()
1381
1382 # Note that xmlrpclib will deserialize object as a list not a tuple
1383 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1384
1385 # Because we are using xmlrpclib for serialization instead of
1386 # pickle this will cause a serialization error.
1387 self.assertRaises(Exception, queue.put, time.sleep)
1388
1389 # Make queue finalizer run before the server is stopped
1390 del queue
1391 manager.shutdown()
1392
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001393class _TestManagerRestart(BaseTestCase):
1394
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001395 @classmethod
1396 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001397 manager = QueueManager(
1398 address=address, authkey=authkey, serializer=SERIALIZER)
1399 manager.connect()
1400 queue = manager.get_queue()
1401 queue.put('hello world')
1402
1403 def test_rapid_restart(self):
1404 authkey = os.urandom(32)
1405 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001406 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001407 srvr = manager.get_server()
1408 addr = srvr.address
1409 # Close the connection.Listener socket which gets opened as a part
1410 # of manager.get_server(). It's not needed for the test.
1411 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001412 manager.start()
1413
1414 p = self.Process(target=self._putter, args=(manager.address, authkey))
1415 p.start()
1416 queue = manager.get_queue()
1417 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001418 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001419 manager.shutdown()
1420 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001421 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001422 try:
1423 manager.start()
1424 except IOError as e:
1425 if e.errno != errno.EADDRINUSE:
1426 raise
1427 # Retry after some time, in case the old socket was lingering
1428 # (sporadic failure on buildbots)
1429 time.sleep(1.0)
1430 manager = QueueManager(
1431 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001432 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001433
Benjamin Petersone711caf2008-06-11 16:44:04 +00001434#
1435#
1436#
1437
1438SENTINEL = latin('')
1439
1440class _TestConnection(BaseTestCase):
1441
1442 ALLOWED_TYPES = ('processes', 'threads')
1443
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001444 @classmethod
1445 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001446 for msg in iter(conn.recv_bytes, SENTINEL):
1447 conn.send_bytes(msg)
1448 conn.close()
1449
1450 def test_connection(self):
1451 conn, child_conn = self.Pipe()
1452
1453 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001454 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001455 p.start()
1456
1457 seq = [1, 2.25, None]
1458 msg = latin('hello world')
1459 longmsg = msg * 10
1460 arr = array.array('i', list(range(4)))
1461
1462 if self.TYPE == 'processes':
1463 self.assertEqual(type(conn.fileno()), int)
1464
1465 self.assertEqual(conn.send(seq), None)
1466 self.assertEqual(conn.recv(), seq)
1467
1468 self.assertEqual(conn.send_bytes(msg), None)
1469 self.assertEqual(conn.recv_bytes(), msg)
1470
1471 if self.TYPE == 'processes':
1472 buffer = array.array('i', [0]*10)
1473 expected = list(arr) + [0] * (10 - len(arr))
1474 self.assertEqual(conn.send_bytes(arr), None)
1475 self.assertEqual(conn.recv_bytes_into(buffer),
1476 len(arr) * buffer.itemsize)
1477 self.assertEqual(list(buffer), expected)
1478
1479 buffer = array.array('i', [0]*10)
1480 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1481 self.assertEqual(conn.send_bytes(arr), None)
1482 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1483 len(arr) * buffer.itemsize)
1484 self.assertEqual(list(buffer), expected)
1485
1486 buffer = bytearray(latin(' ' * 40))
1487 self.assertEqual(conn.send_bytes(longmsg), None)
1488 try:
1489 res = conn.recv_bytes_into(buffer)
1490 except multiprocessing.BufferTooShort as e:
1491 self.assertEqual(e.args, (longmsg,))
1492 else:
1493 self.fail('expected BufferTooShort, got %s' % res)
1494
1495 poll = TimingWrapper(conn.poll)
1496
1497 self.assertEqual(poll(), False)
1498 self.assertTimingAlmostEqual(poll.elapsed, 0)
1499
1500 self.assertEqual(poll(TIMEOUT1), False)
1501 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1502
1503 conn.send(None)
1504
1505 self.assertEqual(poll(TIMEOUT1), True)
1506 self.assertTimingAlmostEqual(poll.elapsed, 0)
1507
1508 self.assertEqual(conn.recv(), None)
1509
1510 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1511 conn.send_bytes(really_big_msg)
1512 self.assertEqual(conn.recv_bytes(), really_big_msg)
1513
1514 conn.send_bytes(SENTINEL) # tell child to quit
1515 child_conn.close()
1516
1517 if self.TYPE == 'processes':
1518 self.assertEqual(conn.readable, True)
1519 self.assertEqual(conn.writable, True)
1520 self.assertRaises(EOFError, conn.recv)
1521 self.assertRaises(EOFError, conn.recv_bytes)
1522
1523 p.join()
1524
1525 def test_duplex_false(self):
1526 reader, writer = self.Pipe(duplex=False)
1527 self.assertEqual(writer.send(1), None)
1528 self.assertEqual(reader.recv(), 1)
1529 if self.TYPE == 'processes':
1530 self.assertEqual(reader.readable, True)
1531 self.assertEqual(reader.writable, False)
1532 self.assertEqual(writer.readable, False)
1533 self.assertEqual(writer.writable, True)
1534 self.assertRaises(IOError, reader.send, 2)
1535 self.assertRaises(IOError, writer.recv)
1536 self.assertRaises(IOError, writer.poll)
1537
1538 def test_spawn_close(self):
1539 # We test that a pipe connection can be closed by parent
1540 # process immediately after child is spawned. On Windows this
1541 # would have sometimes failed on old versions because
1542 # child_conn would be closed before the child got a chance to
1543 # duplicate it.
1544 conn, child_conn = self.Pipe()
1545
1546 p = self.Process(target=self._echo, args=(child_conn,))
1547 p.start()
1548 child_conn.close() # this might complete before child initializes
1549
1550 msg = latin('hello')
1551 conn.send_bytes(msg)
1552 self.assertEqual(conn.recv_bytes(), msg)
1553
1554 conn.send_bytes(SENTINEL)
1555 conn.close()
1556 p.join()
1557
1558 def test_sendbytes(self):
1559 if self.TYPE != 'processes':
1560 return
1561
1562 msg = latin('abcdefghijklmnopqrstuvwxyz')
1563 a, b = self.Pipe()
1564
1565 a.send_bytes(msg)
1566 self.assertEqual(b.recv_bytes(), msg)
1567
1568 a.send_bytes(msg, 5)
1569 self.assertEqual(b.recv_bytes(), msg[5:])
1570
1571 a.send_bytes(msg, 7, 8)
1572 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1573
1574 a.send_bytes(msg, 26)
1575 self.assertEqual(b.recv_bytes(), latin(''))
1576
1577 a.send_bytes(msg, 26, 0)
1578 self.assertEqual(b.recv_bytes(), latin(''))
1579
1580 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1581
1582 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1583
1584 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1585
1586 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1587
1588 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1589
Benjamin Petersone711caf2008-06-11 16:44:04 +00001590class _TestListenerClient(BaseTestCase):
1591
1592 ALLOWED_TYPES = ('processes', 'threads')
1593
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001594 @classmethod
1595 def _test(cls, address):
1596 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001597 conn.send('hello')
1598 conn.close()
1599
1600 def test_listener_client(self):
1601 for family in self.connection.families:
1602 l = self.connection.Listener(family=family)
1603 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001604 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001605 p.start()
1606 conn = l.accept()
1607 self.assertEqual(conn.recv(), 'hello')
1608 p.join()
1609 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001610#
1611# Test of sending connection and socket objects between processes
1612#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001613"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001614class _TestPicklingConnections(BaseTestCase):
1615
1616 ALLOWED_TYPES = ('processes',)
1617
1618 def _listener(self, conn, families):
1619 for fam in families:
1620 l = self.connection.Listener(family=fam)
1621 conn.send(l.address)
1622 new_conn = l.accept()
1623 conn.send(new_conn)
1624
1625 if self.TYPE == 'processes':
1626 l = socket.socket()
1627 l.bind(('localhost', 0))
1628 conn.send(l.getsockname())
1629 l.listen(1)
1630 new_conn, addr = l.accept()
1631 conn.send(new_conn)
1632
1633 conn.recv()
1634
1635 def _remote(self, conn):
1636 for (address, msg) in iter(conn.recv, None):
1637 client = self.connection.Client(address)
1638 client.send(msg.upper())
1639 client.close()
1640
1641 if self.TYPE == 'processes':
1642 address, msg = conn.recv()
1643 client = socket.socket()
1644 client.connect(address)
1645 client.sendall(msg.upper())
1646 client.close()
1647
1648 conn.close()
1649
1650 def test_pickling(self):
1651 try:
1652 multiprocessing.allow_connection_pickling()
1653 except ImportError:
1654 return
1655
1656 families = self.connection.families
1657
1658 lconn, lconn0 = self.Pipe()
1659 lp = self.Process(target=self._listener, args=(lconn0, families))
1660 lp.start()
1661 lconn0.close()
1662
1663 rconn, rconn0 = self.Pipe()
1664 rp = self.Process(target=self._remote, args=(rconn0,))
1665 rp.start()
1666 rconn0.close()
1667
1668 for fam in families:
1669 msg = ('This connection uses family %s' % fam).encode('ascii')
1670 address = lconn.recv()
1671 rconn.send((address, msg))
1672 new_conn = lconn.recv()
1673 self.assertEqual(new_conn.recv(), msg.upper())
1674
1675 rconn.send(None)
1676
1677 if self.TYPE == 'processes':
1678 msg = latin('This connection uses a normal socket')
1679 address = lconn.recv()
1680 rconn.send((address, msg))
1681 if hasattr(socket, 'fromfd'):
1682 new_conn = lconn.recv()
1683 self.assertEqual(new_conn.recv(100), msg.upper())
1684 else:
1685 # XXX On Windows with Py2.6 need to backport fromfd()
1686 discard = lconn.recv_bytes()
1687
1688 lconn.send(None)
1689
1690 rconn.close()
1691 lconn.close()
1692
1693 lp.join()
1694 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001695"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001696#
1697#
1698#
1699
1700class _TestHeap(BaseTestCase):
1701
1702 ALLOWED_TYPES = ('processes',)
1703
1704 def test_heap(self):
1705 iterations = 5000
1706 maxblocks = 50
1707 blocks = []
1708
1709 # create and destroy lots of blocks of different sizes
1710 for i in range(iterations):
1711 size = int(random.lognormvariate(0, 1) * 1000)
1712 b = multiprocessing.heap.BufferWrapper(size)
1713 blocks.append(b)
1714 if len(blocks) > maxblocks:
1715 i = random.randrange(maxblocks)
1716 del blocks[i]
1717
1718 # get the heap object
1719 heap = multiprocessing.heap.BufferWrapper._heap
1720
1721 # verify the state of the heap
1722 all = []
1723 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001724 heap._lock.acquire()
1725 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001726 for L in list(heap._len_to_seq.values()):
1727 for arena, start, stop in L:
1728 all.append((heap._arenas.index(arena), start, stop,
1729 stop-start, 'free'))
1730 for arena, start, stop in heap._allocated_blocks:
1731 all.append((heap._arenas.index(arena), start, stop,
1732 stop-start, 'occupied'))
1733 occupied += (stop-start)
1734
1735 all.sort()
1736
1737 for i in range(len(all)-1):
1738 (arena, start, stop) = all[i][:3]
1739 (narena, nstart, nstop) = all[i+1][:3]
1740 self.assertTrue((arena != narena and nstart == 0) or
1741 (stop == nstart))
1742
Charles-François Natali778db492011-07-02 14:35:49 +02001743 def test_free_from_gc(self):
1744 # Check that freeing of blocks by the garbage collector doesn't deadlock
1745 # (issue #12352).
1746 # Make sure the GC is enabled, and set lower collection thresholds to
1747 # make collections more frequent (and increase the probability of
1748 # deadlock).
1749 if not gc.isenabled():
1750 gc.enable()
1751 self.addCleanup(gc.disable)
1752 thresholds = gc.get_threshold()
1753 self.addCleanup(gc.set_threshold, *thresholds)
1754 gc.set_threshold(10)
1755
1756 # perform numerous block allocations, with cyclic references to make
1757 # sure objects are collected asynchronously by the gc
1758 for i in range(5000):
1759 a = multiprocessing.heap.BufferWrapper(1)
1760 b = multiprocessing.heap.BufferWrapper(1)
1761 # circular references
1762 a.buddy = b
1763 b.buddy = a
1764
Benjamin Petersone711caf2008-06-11 16:44:04 +00001765#
1766#
1767#
1768
Benjamin Petersone711caf2008-06-11 16:44:04 +00001769class _Foo(Structure):
1770 _fields_ = [
1771 ('x', c_int),
1772 ('y', c_double)
1773 ]
1774
1775class _TestSharedCTypes(BaseTestCase):
1776
1777 ALLOWED_TYPES = ('processes',)
1778
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001779 def setUp(self):
1780 if not HAS_SHAREDCTYPES:
1781 self.skipTest("requires multiprocessing.sharedctypes")
1782
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001783 @classmethod
1784 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001785 x.value *= 2
1786 y.value *= 2
1787 foo.x *= 2
1788 foo.y *= 2
1789 string.value *= 2
1790 for i in range(len(arr)):
1791 arr[i] *= 2
1792
1793 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001794 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001795 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001796 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001797 arr = self.Array('d', list(range(10)), lock=lock)
1798 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001799 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001800
1801 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1802 p.start()
1803 p.join()
1804
1805 self.assertEqual(x.value, 14)
1806 self.assertAlmostEqual(y.value, 2.0/3.0)
1807 self.assertEqual(foo.x, 6)
1808 self.assertAlmostEqual(foo.y, 4.0)
1809 for i in range(10):
1810 self.assertAlmostEqual(arr[i], i*2)
1811 self.assertEqual(string.value, latin('hellohello'))
1812
1813 def test_synchronize(self):
1814 self.test_sharedctypes(lock=True)
1815
1816 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001817 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001818 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001819 foo.x = 0
1820 foo.y = 0
1821 self.assertEqual(bar.x, 2)
1822 self.assertAlmostEqual(bar.y, 5.0)
1823
1824#
1825#
1826#
1827
1828class _TestFinalize(BaseTestCase):
1829
1830 ALLOWED_TYPES = ('processes',)
1831
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001832 @classmethod
1833 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001834 class Foo(object):
1835 pass
1836
1837 a = Foo()
1838 util.Finalize(a, conn.send, args=('a',))
1839 del a # triggers callback for a
1840
1841 b = Foo()
1842 close_b = util.Finalize(b, conn.send, args=('b',))
1843 close_b() # triggers callback for b
1844 close_b() # does nothing because callback has already been called
1845 del b # does nothing because callback has already been called
1846
1847 c = Foo()
1848 util.Finalize(c, conn.send, args=('c',))
1849
1850 d10 = Foo()
1851 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1852
1853 d01 = Foo()
1854 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1855 d02 = Foo()
1856 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1857 d03 = Foo()
1858 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1859
1860 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1861
1862 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1863
Ezio Melotti13925002011-03-16 11:05:33 +02001864 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865 # garbage collecting locals
1866 util._exit_function()
1867 conn.close()
1868 os._exit(0)
1869
1870 def test_finalize(self):
1871 conn, child_conn = self.Pipe()
1872
1873 p = self.Process(target=self._test_finalize, args=(child_conn,))
1874 p.start()
1875 p.join()
1876
1877 result = [obj for obj in iter(conn.recv, 'STOP')]
1878 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1879
1880#
1881# Test that from ... import * works for each module
1882#
1883
1884class _TestImportStar(BaseTestCase):
1885
1886 ALLOWED_TYPES = ('processes',)
1887
1888 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001889 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001890 'multiprocessing', 'multiprocessing.connection',
1891 'multiprocessing.heap', 'multiprocessing.managers',
1892 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001893 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001894 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001895 ]
1896
1897 if c_int is not None:
1898 # This module requires _ctypes
1899 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001900
1901 for name in modules:
1902 __import__(name)
1903 mod = sys.modules[name]
1904
1905 for attr in getattr(mod, '__all__', ()):
1906 self.assertTrue(
1907 hasattr(mod, attr),
1908 '%r does not have attribute %r' % (mod, attr)
1909 )
1910
1911#
1912# Quick test that logging works -- does not test logging output
1913#
1914
1915class _TestLogging(BaseTestCase):
1916
1917 ALLOWED_TYPES = ('processes',)
1918
1919 def test_enable_logging(self):
1920 logger = multiprocessing.get_logger()
1921 logger.setLevel(util.SUBWARNING)
1922 self.assertTrue(logger is not None)
1923 logger.debug('this will not be printed')
1924 logger.info('nor will this')
1925 logger.setLevel(LOG_LEVEL)
1926
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001927 @classmethod
1928 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001929 logger = multiprocessing.get_logger()
1930 conn.send(logger.getEffectiveLevel())
1931
1932 def test_level(self):
1933 LEVEL1 = 32
1934 LEVEL2 = 37
1935
1936 logger = multiprocessing.get_logger()
1937 root_logger = logging.getLogger()
1938 root_level = root_logger.level
1939
1940 reader, writer = multiprocessing.Pipe(duplex=False)
1941
1942 logger.setLevel(LEVEL1)
1943 self.Process(target=self._test_level, args=(writer,)).start()
1944 self.assertEqual(LEVEL1, reader.recv())
1945
1946 logger.setLevel(logging.NOTSET)
1947 root_logger.setLevel(LEVEL2)
1948 self.Process(target=self._test_level, args=(writer,)).start()
1949 self.assertEqual(LEVEL2, reader.recv())
1950
1951 root_logger.setLevel(root_level)
1952 logger.setLevel(level=LOG_LEVEL)
1953
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001954
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001955# class _TestLoggingProcessName(BaseTestCase):
1956#
1957# def handle(self, record):
1958# assert record.processName == multiprocessing.current_process().name
1959# self.__handled = True
1960#
1961# def test_logging(self):
1962# handler = logging.Handler()
1963# handler.handle = self.handle
1964# self.__handled = False
1965# # Bypass getLogger() and side-effects
1966# logger = logging.getLoggerClass()(
1967# 'multiprocessing.test.TestLoggingProcessName')
1968# logger.addHandler(handler)
1969# logger.propagate = False
1970#
1971# logger.warn('foo')
1972# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001973
Benjamin Petersone711caf2008-06-11 16:44:04 +00001974#
Jesse Noller6214edd2009-01-19 16:23:53 +00001975# Test to verify handle verification, see issue 3321
1976#
1977
1978class TestInvalidHandle(unittest.TestCase):
1979
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001980 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001981 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02001982 conn = multiprocessing.connection.Connection(44977608)
1983 try:
1984 self.assertRaises((ValueError, IOError), conn.poll)
1985 finally:
1986 # Hack private attribute _handle to avoid printing an error
1987 # in conn.__del__
1988 conn._handle = None
1989 self.assertRaises((ValueError, IOError),
1990 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001991
Jesse Noller6214edd2009-01-19 16:23:53 +00001992#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001993# Functions used to create test cases from the base ones in this module
1994#
1995
1996def get_attributes(Source, names):
1997 d = {}
1998 for name in names:
1999 obj = getattr(Source, name)
2000 if type(obj) == type(get_attributes):
2001 obj = staticmethod(obj)
2002 d[name] = obj
2003 return d
2004
2005def create_test_cases(Mixin, type):
2006 result = {}
2007 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002008 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002009
2010 for name in list(glob.keys()):
2011 if name.startswith('_Test'):
2012 base = glob[name]
2013 if type in base.ALLOWED_TYPES:
2014 newname = 'With' + Type + name[1:]
2015 class Temp(base, unittest.TestCase, Mixin):
2016 pass
2017 result[newname] = Temp
2018 Temp.__name__ = newname
2019 Temp.__module__ = Mixin.__module__
2020 return result
2021
2022#
2023# Create test cases
2024#
2025
2026class ProcessesMixin(object):
2027 TYPE = 'processes'
2028 Process = multiprocessing.Process
2029 locals().update(get_attributes(multiprocessing, (
2030 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2031 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2032 'RawArray', 'current_process', 'active_children', 'Pipe',
2033 'connection', 'JoinableQueue'
2034 )))
2035
2036testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2037globals().update(testcases_processes)
2038
2039
2040class ManagerMixin(object):
2041 TYPE = 'manager'
2042 Process = multiprocessing.Process
2043 manager = object.__new__(multiprocessing.managers.SyncManager)
2044 locals().update(get_attributes(manager, (
2045 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2046 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2047 'Namespace', 'JoinableQueue'
2048 )))
2049
2050testcases_manager = create_test_cases(ManagerMixin, type='manager')
2051globals().update(testcases_manager)
2052
2053
2054class ThreadsMixin(object):
2055 TYPE = 'threads'
2056 Process = multiprocessing.dummy.Process
2057 locals().update(get_attributes(multiprocessing.dummy, (
2058 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2059 'Condition', 'Event', 'Value', 'Array', 'current_process',
2060 'active_children', 'Pipe', 'connection', 'dict', 'list',
2061 'Namespace', 'JoinableQueue'
2062 )))
2063
2064testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2065globals().update(testcases_threads)
2066
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002067class OtherTest(unittest.TestCase):
2068 # TODO: add more tests for deliver/answer challenge.
2069 def test_deliver_challenge_auth_failure(self):
2070 class _FakeConnection(object):
2071 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002072 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002073 def send_bytes(self, data):
2074 pass
2075 self.assertRaises(multiprocessing.AuthenticationError,
2076 multiprocessing.connection.deliver_challenge,
2077 _FakeConnection(), b'abc')
2078
2079 def test_answer_challenge_auth_failure(self):
2080 class _FakeConnection(object):
2081 def __init__(self):
2082 self.count = 0
2083 def recv_bytes(self, size):
2084 self.count += 1
2085 if self.count == 1:
2086 return multiprocessing.connection.CHALLENGE
2087 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002088 return b'something bogus'
2089 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002090 def send_bytes(self, data):
2091 pass
2092 self.assertRaises(multiprocessing.AuthenticationError,
2093 multiprocessing.connection.answer_challenge,
2094 _FakeConnection(), b'abc')
2095
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002096#
2097# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2098#
2099
2100def initializer(ns):
2101 ns.test += 1
2102
2103class TestInitializers(unittest.TestCase):
2104 def setUp(self):
2105 self.mgr = multiprocessing.Manager()
2106 self.ns = self.mgr.Namespace()
2107 self.ns.test = 0
2108
2109 def tearDown(self):
2110 self.mgr.shutdown()
2111
2112 def test_manager_initializer(self):
2113 m = multiprocessing.managers.SyncManager()
2114 self.assertRaises(TypeError, m.start, 1)
2115 m.start(initializer, (self.ns,))
2116 self.assertEqual(self.ns.test, 1)
2117 m.shutdown()
2118
2119 def test_pool_initializer(self):
2120 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2121 p = multiprocessing.Pool(1, initializer, (self.ns,))
2122 p.close()
2123 p.join()
2124 self.assertEqual(self.ns.test, 1)
2125
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002126#
2127# Issue 5155, 5313, 5331: Test process in processes
2128# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2129#
2130
2131def _ThisSubProcess(q):
2132 try:
2133 item = q.get(block=False)
2134 except pyqueue.Empty:
2135 pass
2136
2137def _TestProcess(q):
2138 queue = multiprocessing.Queue()
2139 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2140 subProc.start()
2141 subProc.join()
2142
2143def _afunc(x):
2144 return x*x
2145
2146def pool_in_process():
2147 pool = multiprocessing.Pool(processes=4)
2148 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2149
2150class _file_like(object):
2151 def __init__(self, delegate):
2152 self._delegate = delegate
2153 self._pid = None
2154
2155 @property
2156 def cache(self):
2157 pid = os.getpid()
2158 # There are no race conditions since fork keeps only the running thread
2159 if pid != self._pid:
2160 self._pid = pid
2161 self._cache = []
2162 return self._cache
2163
2164 def write(self, data):
2165 self.cache.append(data)
2166
2167 def flush(self):
2168 self._delegate.write(''.join(self.cache))
2169 self._cache = []
2170
2171class TestStdinBadfiledescriptor(unittest.TestCase):
2172
2173 def test_queue_in_process(self):
2174 queue = multiprocessing.Queue()
2175 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2176 proc.start()
2177 proc.join()
2178
2179 def test_pool_in_process(self):
2180 p = multiprocessing.Process(target=pool_in_process)
2181 p.start()
2182 p.join()
2183
2184 def test_flushing(self):
2185 sio = io.StringIO()
2186 flike = _file_like(sio)
2187 flike.write('foo')
2188 proc = multiprocessing.Process(target=lambda: flike.flush())
2189 flike.flush()
2190 assert sio.getvalue() == 'foo'
2191
2192testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2193 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002194
Benjamin Petersone711caf2008-06-11 16:44:04 +00002195#
2196#
2197#
2198
2199def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002200 if sys.platform.startswith("linux"):
2201 try:
2202 lock = multiprocessing.RLock()
2203 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002204 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002205
Benjamin Petersone711caf2008-06-11 16:44:04 +00002206 if run is None:
2207 from test.support import run_unittest as run
2208
2209 util.get_temp_dir() # creates temp directory for use by all processes
2210
2211 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2212
Benjamin Peterson41181742008-07-02 20:22:54 +00002213 ProcessesMixin.pool = multiprocessing.Pool(4)
2214 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2215 ManagerMixin.manager.__init__()
2216 ManagerMixin.manager.start()
2217 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002218
2219 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002220 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2221 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002222 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2223 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002224 )
2225
2226 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2227 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2228 run(suite)
2229
Benjamin Peterson41181742008-07-02 20:22:54 +00002230 ThreadsMixin.pool.terminate()
2231 ProcessesMixin.pool.terminate()
2232 ManagerMixin.pool.terminate()
2233 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002234
Benjamin Peterson41181742008-07-02 20:22:54 +00002235 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002236
2237def main():
2238 test_main(unittest.TextTestRunner(verbosity=2).run)
2239
2240if __name__ == '__main__':
2241 main()