blob: b752d8d79b8b56de3e22aa7aae4d4f8c0d46b767 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020074if WIN32:
75 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
76
77 def wait_for_handle(handle, timeout):
78 if timeout is None or timeout < 0.0:
79 timeout = INFINITE
80 else:
81 timeout = int(1000 * timeout)
82 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
83else:
84 from select import select
85 _select = util._eintr_retry(select)
86
87 def wait_for_handle(handle, timeout):
88 if timeout is not None and timeout < 0.0:
89 timeout = None
90 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +000091
Benjamin Petersone711caf2008-06-11 16:44:04 +000092#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000093# Some tests require ctypes
94#
95
96try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000097 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000098except ImportError:
99 Structure = object
100 c_int = c_double = None
101
102#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103# Creates a wrapper for a function which records the time it takes to finish
104#
105
106class TimingWrapper(object):
107
108 def __init__(self, func):
109 self.func = func
110 self.elapsed = None
111
112 def __call__(self, *args, **kwds):
113 t = time.time()
114 try:
115 return self.func(*args, **kwds)
116 finally:
117 self.elapsed = time.time() - t
118
119#
120# Base class for test cases
121#
122
123class BaseTestCase(object):
124
125 ALLOWED_TYPES = ('processes', 'manager', 'threads')
126
127 def assertTimingAlmostEqual(self, a, b):
128 if CHECK_TIMINGS:
129 self.assertAlmostEqual(a, b, 1)
130
131 def assertReturnsIfImplemented(self, value, func, *args):
132 try:
133 res = func(*args)
134 except NotImplementedError:
135 pass
136 else:
137 return self.assertEqual(value, res)
138
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000139 # For the sanity of Windows users, rather than crashing or freezing in
140 # multiple ways.
141 def __reduce__(self, *args):
142 raise NotImplementedError("shouldn't try to pickle a test case")
143
144 __reduce_ex__ = __reduce__
145
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146#
147# Return the value of a semaphore
148#
149
150def get_value(self):
151 try:
152 return self.get_value()
153 except AttributeError:
154 try:
155 return self._Semaphore__value
156 except AttributeError:
157 try:
158 return self._value
159 except AttributeError:
160 raise NotImplementedError
161
162#
163# Testcases
164#
165
166class _TestProcess(BaseTestCase):
167
168 ALLOWED_TYPES = ('processes', 'threads')
169
170 def test_current(self):
171 if self.TYPE == 'threads':
172 return
173
174 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000175 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176
177 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000179 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 self.assertEqual(current.ident, os.getpid())
182 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000184 def test_daemon_argument(self):
185 if self.TYPE == "threads":
186 return
187
188 # By default uses the current process's daemon flag.
189 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000190 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000191 proc1 = self.Process(target=self._test, daemon=True)
192 self.assertTrue(proc1.daemon)
193 proc2 = self.Process(target=self._test, daemon=False)
194 self.assertFalse(proc2.daemon)
195
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 q.put(args)
200 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000202 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 p.start()
227
Ezio Melottib3aedd42010-11-20 19:04:17 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
Ezio Melottib3aedd42010-11-20 19:04:17 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
239 p.join()
240
Ezio Melottib3aedd42010-11-20 19:04:17 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
251 return
252
253 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000254 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269
270 p.join()
271
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288
289 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291
292 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000294
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000295 @classmethod
296 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 from multiprocessing import forking
298 wconn.send(id)
299 if len(id) < 2:
300 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000301 p = cls.Process(
302 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000303 )
304 p.start()
305 p.join()
306
307 def test_recursion(self):
308 rconn, wconn = self.Pipe(duplex=False)
309 self._test_recursion(wconn, [])
310
311 time.sleep(DELTA)
312 result = []
313 while rconn.poll():
314 result.append(rconn.recv())
315
316 expected = [
317 [],
318 [0],
319 [0, 0],
320 [0, 1],
321 [1],
322 [1, 0],
323 [1, 1]
324 ]
325 self.assertEqual(result, expected)
326
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200327 @classmethod
328 def _test_sentinel(cls, event):
329 event.wait(10.0)
330
331 def test_sentinel(self):
332 if self.TYPE == "threads":
333 return
334 event = self.Event()
335 p = self.Process(target=self._test_sentinel, args=(event,))
336 with self.assertRaises(ValueError):
337 p.sentinel
338 p.start()
339 self.addCleanup(p.join)
340 sentinel = p.sentinel
341 self.assertIsInstance(sentinel, int)
342 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
343 event.set()
344 p.join()
345 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
346
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347#
348#
349#
350
351class _UpperCaser(multiprocessing.Process):
352
353 def __init__(self):
354 multiprocessing.Process.__init__(self)
355 self.child_conn, self.parent_conn = multiprocessing.Pipe()
356
357 def run(self):
358 self.parent_conn.close()
359 for s in iter(self.child_conn.recv, None):
360 self.child_conn.send(s.upper())
361 self.child_conn.close()
362
363 def submit(self, s):
364 assert type(s) is str
365 self.parent_conn.send(s)
366 return self.parent_conn.recv()
367
368 def stop(self):
369 self.parent_conn.send(None)
370 self.parent_conn.close()
371 self.child_conn.close()
372
373class _TestSubclassingProcess(BaseTestCase):
374
375 ALLOWED_TYPES = ('processes',)
376
377 def test_subclassing(self):
378 uppercaser = _UpperCaser()
379 uppercaser.start()
380 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
381 self.assertEqual(uppercaser.submit('world'), 'WORLD')
382 uppercaser.stop()
383 uppercaser.join()
384
385#
386#
387#
388
389def queue_empty(q):
390 if hasattr(q, 'empty'):
391 return q.empty()
392 else:
393 return q.qsize() == 0
394
395def queue_full(q, maxsize):
396 if hasattr(q, 'full'):
397 return q.full()
398 else:
399 return q.qsize() == maxsize
400
401
402class _TestQueue(BaseTestCase):
403
404
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000405 @classmethod
406 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000407 child_can_start.wait()
408 for i in range(6):
409 queue.get()
410 parent_can_continue.set()
411
412 def test_put(self):
413 MAXSIZE = 6
414 queue = self.Queue(maxsize=MAXSIZE)
415 child_can_start = self.Event()
416 parent_can_continue = self.Event()
417
418 proc = self.Process(
419 target=self._test_put,
420 args=(queue, child_can_start, parent_can_continue)
421 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000422 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 proc.start()
424
425 self.assertEqual(queue_empty(queue), True)
426 self.assertEqual(queue_full(queue, MAXSIZE), False)
427
428 queue.put(1)
429 queue.put(2, True)
430 queue.put(3, True, None)
431 queue.put(4, False)
432 queue.put(5, False, None)
433 queue.put_nowait(6)
434
435 # the values may be in buffer but not yet in pipe so sleep a bit
436 time.sleep(DELTA)
437
438 self.assertEqual(queue_empty(queue), False)
439 self.assertEqual(queue_full(queue, MAXSIZE), True)
440
441 put = TimingWrapper(queue.put)
442 put_nowait = TimingWrapper(queue.put_nowait)
443
444 self.assertRaises(pyqueue.Full, put, 7, False)
445 self.assertTimingAlmostEqual(put.elapsed, 0)
446
447 self.assertRaises(pyqueue.Full, put, 7, False, None)
448 self.assertTimingAlmostEqual(put.elapsed, 0)
449
450 self.assertRaises(pyqueue.Full, put_nowait, 7)
451 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
452
453 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
454 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
455
456 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
460 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
461
462 child_can_start.set()
463 parent_can_continue.wait()
464
465 self.assertEqual(queue_empty(queue), True)
466 self.assertEqual(queue_full(queue, MAXSIZE), False)
467
468 proc.join()
469
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000470 @classmethod
471 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000472 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000473 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000474 queue.put(2)
475 queue.put(3)
476 queue.put(4)
477 queue.put(5)
478 parent_can_continue.set()
479
480 def test_get(self):
481 queue = self.Queue()
482 child_can_start = self.Event()
483 parent_can_continue = self.Event()
484
485 proc = self.Process(
486 target=self._test_get,
487 args=(queue, child_can_start, parent_can_continue)
488 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000489 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000490 proc.start()
491
492 self.assertEqual(queue_empty(queue), True)
493
494 child_can_start.set()
495 parent_can_continue.wait()
496
497 time.sleep(DELTA)
498 self.assertEqual(queue_empty(queue), False)
499
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000500 # Hangs unexpectedly, remove for now
501 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 self.assertEqual(queue.get(True, None), 2)
503 self.assertEqual(queue.get(True), 3)
504 self.assertEqual(queue.get(timeout=1), 4)
505 self.assertEqual(queue.get_nowait(), 5)
506
507 self.assertEqual(queue_empty(queue), True)
508
509 get = TimingWrapper(queue.get)
510 get_nowait = TimingWrapper(queue.get_nowait)
511
512 self.assertRaises(pyqueue.Empty, get, False)
513 self.assertTimingAlmostEqual(get.elapsed, 0)
514
515 self.assertRaises(pyqueue.Empty, get, False, None)
516 self.assertTimingAlmostEqual(get.elapsed, 0)
517
518 self.assertRaises(pyqueue.Empty, get_nowait)
519 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
520
521 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
522 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
523
524 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
528 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
529
530 proc.join()
531
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000532 @classmethod
533 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 for i in range(10, 20):
535 queue.put(i)
536 # note that at this point the items may only be buffered, so the
537 # process cannot shutdown until the feeder thread has finished
538 # pushing items onto the pipe.
539
540 def test_fork(self):
541 # Old versions of Queue would fail to create a new feeder
542 # thread for a forked process if the original process had its
543 # own feeder thread. This test checks that this no longer
544 # happens.
545
546 queue = self.Queue()
547
548 # put items on queue so that main process starts a feeder thread
549 for i in range(10):
550 queue.put(i)
551
552 # wait to make sure thread starts before we fork a new process
553 time.sleep(DELTA)
554
555 # fork process
556 p = self.Process(target=self._test_fork, args=(queue,))
557 p.start()
558
559 # check that all expected items are in the queue
560 for i in range(20):
561 self.assertEqual(queue.get(), i)
562 self.assertRaises(pyqueue.Empty, queue.get, False)
563
564 p.join()
565
566 def test_qsize(self):
567 q = self.Queue()
568 try:
569 self.assertEqual(q.qsize(), 0)
570 except NotImplementedError:
571 return
572 q.put(1)
573 self.assertEqual(q.qsize(), 1)
574 q.put(5)
575 self.assertEqual(q.qsize(), 2)
576 q.get()
577 self.assertEqual(q.qsize(), 1)
578 q.get()
579 self.assertEqual(q.qsize(), 0)
580
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000581 @classmethod
582 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000583 for obj in iter(q.get, None):
584 time.sleep(DELTA)
585 q.task_done()
586
587 def test_task_done(self):
588 queue = self.JoinableQueue()
589
590 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000591 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592
593 workers = [self.Process(target=self._test_task_done, args=(queue,))
594 for i in range(4)]
595
596 for p in workers:
597 p.start()
598
599 for i in range(10):
600 queue.put(i)
601
602 queue.join()
603
604 for p in workers:
605 queue.put(None)
606
607 for p in workers:
608 p.join()
609
610#
611#
612#
613
614class _TestLock(BaseTestCase):
615
616 def test_lock(self):
617 lock = self.Lock()
618 self.assertEqual(lock.acquire(), True)
619 self.assertEqual(lock.acquire(False), False)
620 self.assertEqual(lock.release(), None)
621 self.assertRaises((ValueError, threading.ThreadError), lock.release)
622
623 def test_rlock(self):
624 lock = self.RLock()
625 self.assertEqual(lock.acquire(), True)
626 self.assertEqual(lock.acquire(), True)
627 self.assertEqual(lock.acquire(), True)
628 self.assertEqual(lock.release(), None)
629 self.assertEqual(lock.release(), None)
630 self.assertEqual(lock.release(), None)
631 self.assertRaises((AssertionError, RuntimeError), lock.release)
632
Jesse Nollerf8d00852009-03-31 03:25:07 +0000633 def test_lock_context(self):
634 with self.Lock():
635 pass
636
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637
638class _TestSemaphore(BaseTestCase):
639
640 def _test_semaphore(self, sem):
641 self.assertReturnsIfImplemented(2, get_value, sem)
642 self.assertEqual(sem.acquire(), True)
643 self.assertReturnsIfImplemented(1, get_value, sem)
644 self.assertEqual(sem.acquire(), True)
645 self.assertReturnsIfImplemented(0, get_value, sem)
646 self.assertEqual(sem.acquire(False), False)
647 self.assertReturnsIfImplemented(0, get_value, sem)
648 self.assertEqual(sem.release(), None)
649 self.assertReturnsIfImplemented(1, get_value, sem)
650 self.assertEqual(sem.release(), None)
651 self.assertReturnsIfImplemented(2, get_value, sem)
652
653 def test_semaphore(self):
654 sem = self.Semaphore(2)
655 self._test_semaphore(sem)
656 self.assertEqual(sem.release(), None)
657 self.assertReturnsIfImplemented(3, get_value, sem)
658 self.assertEqual(sem.release(), None)
659 self.assertReturnsIfImplemented(4, get_value, sem)
660
661 def test_bounded_semaphore(self):
662 sem = self.BoundedSemaphore(2)
663 self._test_semaphore(sem)
664 # Currently fails on OS/X
665 #if HAVE_GETVALUE:
666 # self.assertRaises(ValueError, sem.release)
667 # self.assertReturnsIfImplemented(2, get_value, sem)
668
669 def test_timeout(self):
670 if self.TYPE != 'processes':
671 return
672
673 sem = self.Semaphore(0)
674 acquire = TimingWrapper(sem.acquire)
675
676 self.assertEqual(acquire(False), False)
677 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
678
679 self.assertEqual(acquire(False, None), False)
680 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
681
682 self.assertEqual(acquire(False, TIMEOUT1), False)
683 self.assertTimingAlmostEqual(acquire.elapsed, 0)
684
685 self.assertEqual(acquire(True, TIMEOUT2), False)
686 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
687
688 self.assertEqual(acquire(timeout=TIMEOUT3), False)
689 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
690
691
692class _TestCondition(BaseTestCase):
693
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000694 @classmethod
695 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000696 cond.acquire()
697 sleeping.release()
698 cond.wait(timeout)
699 woken.release()
700 cond.release()
701
702 def check_invariant(self, cond):
703 # this is only supposed to succeed when there are no sleepers
704 if self.TYPE == 'processes':
705 try:
706 sleepers = (cond._sleeping_count.get_value() -
707 cond._woken_count.get_value())
708 self.assertEqual(sleepers, 0)
709 self.assertEqual(cond._wait_semaphore.get_value(), 0)
710 except NotImplementedError:
711 pass
712
713 def test_notify(self):
714 cond = self.Condition()
715 sleeping = self.Semaphore(0)
716 woken = self.Semaphore(0)
717
718 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000719 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 p.start()
721
722 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000723 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000724 p.start()
725
726 # wait for both children to start sleeping
727 sleeping.acquire()
728 sleeping.acquire()
729
730 # check no process/thread has woken up
731 time.sleep(DELTA)
732 self.assertReturnsIfImplemented(0, get_value, woken)
733
734 # wake up one process/thread
735 cond.acquire()
736 cond.notify()
737 cond.release()
738
739 # check one process/thread has woken up
740 time.sleep(DELTA)
741 self.assertReturnsIfImplemented(1, get_value, woken)
742
743 # wake up another
744 cond.acquire()
745 cond.notify()
746 cond.release()
747
748 # check other has woken up
749 time.sleep(DELTA)
750 self.assertReturnsIfImplemented(2, get_value, woken)
751
752 # check state is not mucked up
753 self.check_invariant(cond)
754 p.join()
755
756 def test_notify_all(self):
757 cond = self.Condition()
758 sleeping = self.Semaphore(0)
759 woken = self.Semaphore(0)
760
761 # start some threads/processes which will timeout
762 for i in range(3):
763 p = self.Process(target=self.f,
764 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000765 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000766 p.start()
767
768 t = threading.Thread(target=self.f,
769 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000770 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000771 t.start()
772
773 # wait for them all to sleep
774 for i in range(6):
775 sleeping.acquire()
776
777 # check they have all timed out
778 for i in range(6):
779 woken.acquire()
780 self.assertReturnsIfImplemented(0, get_value, woken)
781
782 # check state is not mucked up
783 self.check_invariant(cond)
784
785 # start some more threads/processes
786 for i in range(3):
787 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000788 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789 p.start()
790
791 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000792 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 t.start()
794
795 # wait for them to all sleep
796 for i in range(6):
797 sleeping.acquire()
798
799 # check no process/thread has woken up
800 time.sleep(DELTA)
801 self.assertReturnsIfImplemented(0, get_value, woken)
802
803 # wake them all up
804 cond.acquire()
805 cond.notify_all()
806 cond.release()
807
808 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200809 for i in range(10):
810 try:
811 if get_value(woken) == 6:
812 break
813 except NotImplementedError:
814 break
815 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 self.assertReturnsIfImplemented(6, get_value, woken)
817
818 # check state is not mucked up
819 self.check_invariant(cond)
820
821 def test_timeout(self):
822 cond = self.Condition()
823 wait = TimingWrapper(cond.wait)
824 cond.acquire()
825 res = wait(TIMEOUT1)
826 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000827 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000828 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
829
830
831class _TestEvent(BaseTestCase):
832
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000833 @classmethod
834 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 time.sleep(TIMEOUT2)
836 event.set()
837
838 def test_event(self):
839 event = self.Event()
840 wait = TimingWrapper(event.wait)
841
Ezio Melotti13925002011-03-16 11:05:33 +0200842 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000843 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000844 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845
Benjamin Peterson965ce872009-04-05 21:24:58 +0000846 # Removed, threading.Event.wait() will return the value of the __flag
847 # instead of None. API Shear with the semaphore backed mp.Event
848 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000850 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000851 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
852
853 event.set()
854
855 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000856 self.assertEqual(event.is_set(), True)
857 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000859 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000860 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
861 # self.assertEqual(event.is_set(), True)
862
863 event.clear()
864
865 #self.assertEqual(event.is_set(), False)
866
867 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000868 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869
870#
871#
872#
873
874class _TestValue(BaseTestCase):
875
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000876 ALLOWED_TYPES = ('processes',)
877
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 codes_values = [
879 ('i', 4343, 24234),
880 ('d', 3.625, -4.25),
881 ('h', -232, 234),
882 ('c', latin('x'), latin('y'))
883 ]
884
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000885 def setUp(self):
886 if not HAS_SHAREDCTYPES:
887 self.skipTest("requires multiprocessing.sharedctypes")
888
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000889 @classmethod
890 def _test(cls, values):
891 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 sv.value = cv[2]
893
894
895 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 if raw:
897 values = [self.RawValue(code, value)
898 for code, value, _ in self.codes_values]
899 else:
900 values = [self.Value(code, value)
901 for code, value, _ in self.codes_values]
902
903 for sv, cv in zip(values, self.codes_values):
904 self.assertEqual(sv.value, cv[1])
905
906 proc = self.Process(target=self._test, args=(values,))
907 proc.start()
908 proc.join()
909
910 for sv, cv in zip(values, self.codes_values):
911 self.assertEqual(sv.value, cv[2])
912
913 def test_rawvalue(self):
914 self.test_value(raw=True)
915
916 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 val1 = self.Value('i', 5)
918 lock1 = val1.get_lock()
919 obj1 = val1.get_obj()
920
921 val2 = self.Value('i', 5, lock=None)
922 lock2 = val2.get_lock()
923 obj2 = val2.get_obj()
924
925 lock = self.Lock()
926 val3 = self.Value('i', 5, lock=lock)
927 lock3 = val3.get_lock()
928 obj3 = val3.get_obj()
929 self.assertEqual(lock, lock3)
930
Jesse Nollerb0516a62009-01-18 03:11:38 +0000931 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000932 self.assertFalse(hasattr(arr4, 'get_lock'))
933 self.assertFalse(hasattr(arr4, 'get_obj'))
934
Jesse Nollerb0516a62009-01-18 03:11:38 +0000935 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
936
937 arr5 = self.RawValue('i', 5)
938 self.assertFalse(hasattr(arr5, 'get_lock'))
939 self.assertFalse(hasattr(arr5, 'get_obj'))
940
Benjamin Petersone711caf2008-06-11 16:44:04 +0000941
942class _TestArray(BaseTestCase):
943
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000944 ALLOWED_TYPES = ('processes',)
945
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000946 @classmethod
947 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000948 for i in range(1, len(seq)):
949 seq[i] += seq[i-1]
950
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000951 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000953 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
954 if raw:
955 arr = self.RawArray('i', seq)
956 else:
957 arr = self.Array('i', seq)
958
959 self.assertEqual(len(arr), len(seq))
960 self.assertEqual(arr[3], seq[3])
961 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
962
963 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
964
965 self.assertEqual(list(arr[:]), seq)
966
967 self.f(seq)
968
969 p = self.Process(target=self.f, args=(arr,))
970 p.start()
971 p.join()
972
973 self.assertEqual(list(arr[:]), seq)
974
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000975 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000976 def test_array_from_size(self):
977 size = 10
978 # Test for zeroing (see issue #11675).
979 # The repetition below strengthens the test by increasing the chances
980 # of previously allocated non-zero memory being used for the new array
981 # on the 2nd and 3rd loops.
982 for _ in range(3):
983 arr = self.Array('i', size)
984 self.assertEqual(len(arr), size)
985 self.assertEqual(list(arr), [0] * size)
986 arr[:] = range(10)
987 self.assertEqual(list(arr), list(range(10)))
988 del arr
989
990 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 def test_rawarray(self):
992 self.test_array(raw=True)
993
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000994 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996 arr1 = self.Array('i', list(range(10)))
997 lock1 = arr1.get_lock()
998 obj1 = arr1.get_obj()
999
1000 arr2 = self.Array('i', list(range(10)), lock=None)
1001 lock2 = arr2.get_lock()
1002 obj2 = arr2.get_obj()
1003
1004 lock = self.Lock()
1005 arr3 = self.Array('i', list(range(10)), lock=lock)
1006 lock3 = arr3.get_lock()
1007 obj3 = arr3.get_obj()
1008 self.assertEqual(lock, lock3)
1009
Jesse Nollerb0516a62009-01-18 03:11:38 +00001010 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011 self.assertFalse(hasattr(arr4, 'get_lock'))
1012 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001013 self.assertRaises(AttributeError,
1014 self.Array, 'i', range(10), lock='notalock')
1015
1016 arr5 = self.RawArray('i', range(10))
1017 self.assertFalse(hasattr(arr5, 'get_lock'))
1018 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001019
1020#
1021#
1022#
1023
1024class _TestContainers(BaseTestCase):
1025
1026 ALLOWED_TYPES = ('manager',)
1027
1028 def test_list(self):
1029 a = self.list(list(range(10)))
1030 self.assertEqual(a[:], list(range(10)))
1031
1032 b = self.list()
1033 self.assertEqual(b[:], [])
1034
1035 b.extend(list(range(5)))
1036 self.assertEqual(b[:], list(range(5)))
1037
1038 self.assertEqual(b[2], 2)
1039 self.assertEqual(b[2:10], [2,3,4])
1040
1041 b *= 2
1042 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1043
1044 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1045
1046 self.assertEqual(a[:], list(range(10)))
1047
1048 d = [a, b]
1049 e = self.list(d)
1050 self.assertEqual(
1051 e[:],
1052 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1053 )
1054
1055 f = self.list([a])
1056 a.append('hello')
1057 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1058
1059 def test_dict(self):
1060 d = self.dict()
1061 indices = list(range(65, 70))
1062 for i in indices:
1063 d[i] = chr(i)
1064 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1065 self.assertEqual(sorted(d.keys()), indices)
1066 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1067 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1068
1069 def test_namespace(self):
1070 n = self.Namespace()
1071 n.name = 'Bob'
1072 n.job = 'Builder'
1073 n._hidden = 'hidden'
1074 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1075 del n.job
1076 self.assertEqual(str(n), "Namespace(name='Bob')")
1077 self.assertTrue(hasattr(n, 'name'))
1078 self.assertTrue(not hasattr(n, 'job'))
1079
1080#
1081#
1082#
1083
1084def sqr(x, wait=0.0):
1085 time.sleep(wait)
1086 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001087
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088class _TestPool(BaseTestCase):
1089
1090 def test_apply(self):
1091 papply = self.pool.apply
1092 self.assertEqual(papply(sqr, (5,)), sqr(5))
1093 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1094
1095 def test_map(self):
1096 pmap = self.pool.map
1097 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1098 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1099 list(map(sqr, list(range(100)))))
1100
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001101 def test_map_chunksize(self):
1102 try:
1103 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1104 except multiprocessing.TimeoutError:
1105 self.fail("pool.map_async with chunksize stalled on null list")
1106
Benjamin Petersone711caf2008-06-11 16:44:04 +00001107 def test_async(self):
1108 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1109 get = TimingWrapper(res.get)
1110 self.assertEqual(get(), 49)
1111 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1112
1113 def test_async_timeout(self):
1114 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1115 get = TimingWrapper(res.get)
1116 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1117 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1118
1119 def test_imap(self):
1120 it = self.pool.imap(sqr, list(range(10)))
1121 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1122
1123 it = self.pool.imap(sqr, list(range(10)))
1124 for i in range(10):
1125 self.assertEqual(next(it), i*i)
1126 self.assertRaises(StopIteration, it.__next__)
1127
1128 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1129 for i in range(1000):
1130 self.assertEqual(next(it), i*i)
1131 self.assertRaises(StopIteration, it.__next__)
1132
1133 def test_imap_unordered(self):
1134 it = self.pool.imap_unordered(sqr, list(range(1000)))
1135 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1136
1137 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1138 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1139
1140 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001141 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1142 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1143
Benjamin Petersone711caf2008-06-11 16:44:04 +00001144 p = multiprocessing.Pool(3)
1145 self.assertEqual(3, len(p._pool))
1146 p.close()
1147 p.join()
1148
1149 def test_terminate(self):
1150 if self.TYPE == 'manager':
1151 # On Unix a forked process increfs each shared object to
1152 # which its parent process held a reference. If the
1153 # forked process gets terminated then there is likely to
1154 # be a reference leak. So to prevent
1155 # _TestZZZNumberOfObjects from failing we skip this test
1156 # when using a manager.
1157 return
1158
1159 result = self.pool.map_async(
1160 time.sleep, [0.1 for i in range(10000)], chunksize=1
1161 )
1162 self.pool.terminate()
1163 join = TimingWrapper(self.pool.join)
1164 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001165 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001166
Ask Solem2afcbf22010-11-09 20:55:52 +00001167def raising():
1168 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001169
Ask Solem2afcbf22010-11-09 20:55:52 +00001170def unpickleable_result():
1171 return lambda: 42
1172
1173class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001174 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001175
1176 def test_async_error_callback(self):
1177 p = multiprocessing.Pool(2)
1178
1179 scratchpad = [None]
1180 def errback(exc):
1181 scratchpad[0] = exc
1182
1183 res = p.apply_async(raising, error_callback=errback)
1184 self.assertRaises(KeyError, res.get)
1185 self.assertTrue(scratchpad[0])
1186 self.assertIsInstance(scratchpad[0], KeyError)
1187
1188 p.close()
1189 p.join()
1190
1191 def test_unpickleable_result(self):
1192 from multiprocessing.pool import MaybeEncodingError
1193 p = multiprocessing.Pool(2)
1194
1195 # Make sure we don't lose pool processes because of encoding errors.
1196 for iteration in range(20):
1197
1198 scratchpad = [None]
1199 def errback(exc):
1200 scratchpad[0] = exc
1201
1202 res = p.apply_async(unpickleable_result, error_callback=errback)
1203 self.assertRaises(MaybeEncodingError, res.get)
1204 wrapped = scratchpad[0]
1205 self.assertTrue(wrapped)
1206 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1207 self.assertIsNotNone(wrapped.exc)
1208 self.assertIsNotNone(wrapped.value)
1209
1210 p.close()
1211 p.join()
1212
1213class _TestPoolWorkerLifetime(BaseTestCase):
1214 ALLOWED_TYPES = ('processes', )
1215
Jesse Noller1f0b6582010-01-27 03:36:01 +00001216 def test_pool_worker_lifetime(self):
1217 p = multiprocessing.Pool(3, maxtasksperchild=10)
1218 self.assertEqual(3, len(p._pool))
1219 origworkerpids = [w.pid for w in p._pool]
1220 # Run many tasks so each worker gets replaced (hopefully)
1221 results = []
1222 for i in range(100):
1223 results.append(p.apply_async(sqr, (i, )))
1224 # Fetch the results and verify we got the right answers,
1225 # also ensuring all the tasks have completed.
1226 for (j, res) in enumerate(results):
1227 self.assertEqual(res.get(), sqr(j))
1228 # Refill the pool
1229 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001230 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001231 # (countdown * DELTA = 5 seconds max startup process time)
1232 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001233 while countdown and not all(w.is_alive() for w in p._pool):
1234 countdown -= 1
1235 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001236 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001237 # All pids should be assigned. See issue #7805.
1238 self.assertNotIn(None, origworkerpids)
1239 self.assertNotIn(None, finalworkerpids)
1240 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001241 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1242 p.close()
1243 p.join()
1244
Benjamin Petersone711caf2008-06-11 16:44:04 +00001245#
1246# Test that manager has expected number of shared objects left
1247#
1248
1249class _TestZZZNumberOfObjects(BaseTestCase):
1250 # Because test cases are sorted alphabetically, this one will get
1251 # run after all the other tests for the manager. It tests that
1252 # there have been no "reference leaks" for the manager's shared
1253 # objects. Note the comment in _TestPool.test_terminate().
1254 ALLOWED_TYPES = ('manager',)
1255
1256 def test_number_of_objects(self):
1257 EXPECTED_NUMBER = 1 # the pool object is still alive
1258 multiprocessing.active_children() # discard dead process objs
1259 gc.collect() # do garbage collection
1260 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001261 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001262 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001263 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001264 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001265
1266 self.assertEqual(refs, EXPECTED_NUMBER)
1267
1268#
1269# Test of creating a customized manager class
1270#
1271
1272from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1273
1274class FooBar(object):
1275 def f(self):
1276 return 'f()'
1277 def g(self):
1278 raise ValueError
1279 def _h(self):
1280 return '_h()'
1281
1282def baz():
1283 for i in range(10):
1284 yield i*i
1285
1286class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001287 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001288 def __iter__(self):
1289 return self
1290 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001291 return self._callmethod('__next__')
1292
1293class MyManager(BaseManager):
1294 pass
1295
1296MyManager.register('Foo', callable=FooBar)
1297MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1298MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1299
1300
1301class _TestMyManager(BaseTestCase):
1302
1303 ALLOWED_TYPES = ('manager',)
1304
1305 def test_mymanager(self):
1306 manager = MyManager()
1307 manager.start()
1308
1309 foo = manager.Foo()
1310 bar = manager.Bar()
1311 baz = manager.baz()
1312
1313 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1314 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1315
1316 self.assertEqual(foo_methods, ['f', 'g'])
1317 self.assertEqual(bar_methods, ['f', '_h'])
1318
1319 self.assertEqual(foo.f(), 'f()')
1320 self.assertRaises(ValueError, foo.g)
1321 self.assertEqual(foo._callmethod('f'), 'f()')
1322 self.assertRaises(RemoteError, foo._callmethod, '_h')
1323
1324 self.assertEqual(bar.f(), 'f()')
1325 self.assertEqual(bar._h(), '_h()')
1326 self.assertEqual(bar._callmethod('f'), 'f()')
1327 self.assertEqual(bar._callmethod('_h'), '_h()')
1328
1329 self.assertEqual(list(baz), [i*i for i in range(10)])
1330
1331 manager.shutdown()
1332
1333#
1334# Test of connecting to a remote server and using xmlrpclib for serialization
1335#
1336
1337_queue = pyqueue.Queue()
1338def get_queue():
1339 return _queue
1340
1341class QueueManager(BaseManager):
1342 '''manager class used by server process'''
1343QueueManager.register('get_queue', callable=get_queue)
1344
1345class QueueManager2(BaseManager):
1346 '''manager class which specifies the same interface as QueueManager'''
1347QueueManager2.register('get_queue')
1348
1349
1350SERIALIZER = 'xmlrpclib'
1351
1352class _TestRemoteManager(BaseTestCase):
1353
1354 ALLOWED_TYPES = ('manager',)
1355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001356 @classmethod
1357 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001358 manager = QueueManager2(
1359 address=address, authkey=authkey, serializer=SERIALIZER
1360 )
1361 manager.connect()
1362 queue = manager.get_queue()
1363 queue.put(('hello world', None, True, 2.25))
1364
1365 def test_remote(self):
1366 authkey = os.urandom(32)
1367
1368 manager = QueueManager(
1369 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1370 )
1371 manager.start()
1372
1373 p = self.Process(target=self._putter, args=(manager.address, authkey))
1374 p.start()
1375
1376 manager2 = QueueManager2(
1377 address=manager.address, authkey=authkey, serializer=SERIALIZER
1378 )
1379 manager2.connect()
1380 queue = manager2.get_queue()
1381
1382 # Note that xmlrpclib will deserialize object as a list not a tuple
1383 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1384
1385 # Because we are using xmlrpclib for serialization instead of
1386 # pickle this will cause a serialization error.
1387 self.assertRaises(Exception, queue.put, time.sleep)
1388
1389 # Make queue finalizer run before the server is stopped
1390 del queue
1391 manager.shutdown()
1392
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001393class _TestManagerRestart(BaseTestCase):
1394
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001395 @classmethod
1396 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001397 manager = QueueManager(
1398 address=address, authkey=authkey, serializer=SERIALIZER)
1399 manager.connect()
1400 queue = manager.get_queue()
1401 queue.put('hello world')
1402
1403 def test_rapid_restart(self):
1404 authkey = os.urandom(32)
1405 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001406 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001407 srvr = manager.get_server()
1408 addr = srvr.address
1409 # Close the connection.Listener socket which gets opened as a part
1410 # of manager.get_server(). It's not needed for the test.
1411 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001412 manager.start()
1413
1414 p = self.Process(target=self._putter, args=(manager.address, authkey))
1415 p.start()
1416 queue = manager.get_queue()
1417 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001418 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001419 manager.shutdown()
1420 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001421 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001422 try:
1423 manager.start()
1424 except IOError as e:
1425 if e.errno != errno.EADDRINUSE:
1426 raise
1427 # Retry after some time, in case the old socket was lingering
1428 # (sporadic failure on buildbots)
1429 time.sleep(1.0)
1430 manager = QueueManager(
1431 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001432 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001433
Benjamin Petersone711caf2008-06-11 16:44:04 +00001434#
1435#
1436#
1437
1438SENTINEL = latin('')
1439
1440class _TestConnection(BaseTestCase):
1441
1442 ALLOWED_TYPES = ('processes', 'threads')
1443
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001444 @classmethod
1445 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001446 for msg in iter(conn.recv_bytes, SENTINEL):
1447 conn.send_bytes(msg)
1448 conn.close()
1449
1450 def test_connection(self):
1451 conn, child_conn = self.Pipe()
1452
1453 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001454 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001455 p.start()
1456
1457 seq = [1, 2.25, None]
1458 msg = latin('hello world')
1459 longmsg = msg * 10
1460 arr = array.array('i', list(range(4)))
1461
1462 if self.TYPE == 'processes':
1463 self.assertEqual(type(conn.fileno()), int)
1464
1465 self.assertEqual(conn.send(seq), None)
1466 self.assertEqual(conn.recv(), seq)
1467
1468 self.assertEqual(conn.send_bytes(msg), None)
1469 self.assertEqual(conn.recv_bytes(), msg)
1470
1471 if self.TYPE == 'processes':
1472 buffer = array.array('i', [0]*10)
1473 expected = list(arr) + [0] * (10 - len(arr))
1474 self.assertEqual(conn.send_bytes(arr), None)
1475 self.assertEqual(conn.recv_bytes_into(buffer),
1476 len(arr) * buffer.itemsize)
1477 self.assertEqual(list(buffer), expected)
1478
1479 buffer = array.array('i', [0]*10)
1480 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1481 self.assertEqual(conn.send_bytes(arr), None)
1482 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1483 len(arr) * buffer.itemsize)
1484 self.assertEqual(list(buffer), expected)
1485
1486 buffer = bytearray(latin(' ' * 40))
1487 self.assertEqual(conn.send_bytes(longmsg), None)
1488 try:
1489 res = conn.recv_bytes_into(buffer)
1490 except multiprocessing.BufferTooShort as e:
1491 self.assertEqual(e.args, (longmsg,))
1492 else:
1493 self.fail('expected BufferTooShort, got %s' % res)
1494
1495 poll = TimingWrapper(conn.poll)
1496
1497 self.assertEqual(poll(), False)
1498 self.assertTimingAlmostEqual(poll.elapsed, 0)
1499
1500 self.assertEqual(poll(TIMEOUT1), False)
1501 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1502
1503 conn.send(None)
1504
1505 self.assertEqual(poll(TIMEOUT1), True)
1506 self.assertTimingAlmostEqual(poll.elapsed, 0)
1507
1508 self.assertEqual(conn.recv(), None)
1509
1510 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1511 conn.send_bytes(really_big_msg)
1512 self.assertEqual(conn.recv_bytes(), really_big_msg)
1513
1514 conn.send_bytes(SENTINEL) # tell child to quit
1515 child_conn.close()
1516
1517 if self.TYPE == 'processes':
1518 self.assertEqual(conn.readable, True)
1519 self.assertEqual(conn.writable, True)
1520 self.assertRaises(EOFError, conn.recv)
1521 self.assertRaises(EOFError, conn.recv_bytes)
1522
1523 p.join()
1524
1525 def test_duplex_false(self):
1526 reader, writer = self.Pipe(duplex=False)
1527 self.assertEqual(writer.send(1), None)
1528 self.assertEqual(reader.recv(), 1)
1529 if self.TYPE == 'processes':
1530 self.assertEqual(reader.readable, True)
1531 self.assertEqual(reader.writable, False)
1532 self.assertEqual(writer.readable, False)
1533 self.assertEqual(writer.writable, True)
1534 self.assertRaises(IOError, reader.send, 2)
1535 self.assertRaises(IOError, writer.recv)
1536 self.assertRaises(IOError, writer.poll)
1537
1538 def test_spawn_close(self):
1539 # We test that a pipe connection can be closed by parent
1540 # process immediately after child is spawned. On Windows this
1541 # would have sometimes failed on old versions because
1542 # child_conn would be closed before the child got a chance to
1543 # duplicate it.
1544 conn, child_conn = self.Pipe()
1545
1546 p = self.Process(target=self._echo, args=(child_conn,))
1547 p.start()
1548 child_conn.close() # this might complete before child initializes
1549
1550 msg = latin('hello')
1551 conn.send_bytes(msg)
1552 self.assertEqual(conn.recv_bytes(), msg)
1553
1554 conn.send_bytes(SENTINEL)
1555 conn.close()
1556 p.join()
1557
1558 def test_sendbytes(self):
1559 if self.TYPE != 'processes':
1560 return
1561
1562 msg = latin('abcdefghijklmnopqrstuvwxyz')
1563 a, b = self.Pipe()
1564
1565 a.send_bytes(msg)
1566 self.assertEqual(b.recv_bytes(), msg)
1567
1568 a.send_bytes(msg, 5)
1569 self.assertEqual(b.recv_bytes(), msg[5:])
1570
1571 a.send_bytes(msg, 7, 8)
1572 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1573
1574 a.send_bytes(msg, 26)
1575 self.assertEqual(b.recv_bytes(), latin(''))
1576
1577 a.send_bytes(msg, 26, 0)
1578 self.assertEqual(b.recv_bytes(), latin(''))
1579
1580 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1581
1582 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1583
1584 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1585
1586 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1587
1588 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1589
Benjamin Petersone711caf2008-06-11 16:44:04 +00001590class _TestListenerClient(BaseTestCase):
1591
1592 ALLOWED_TYPES = ('processes', 'threads')
1593
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001594 @classmethod
1595 def _test(cls, address):
1596 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001597 conn.send('hello')
1598 conn.close()
1599
1600 def test_listener_client(self):
1601 for family in self.connection.families:
1602 l = self.connection.Listener(family=family)
1603 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001604 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001605 p.start()
1606 conn = l.accept()
1607 self.assertEqual(conn.recv(), 'hello')
1608 p.join()
1609 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001610#
1611# Test of sending connection and socket objects between processes
1612#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001613"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001614class _TestPicklingConnections(BaseTestCase):
1615
1616 ALLOWED_TYPES = ('processes',)
1617
1618 def _listener(self, conn, families):
1619 for fam in families:
1620 l = self.connection.Listener(family=fam)
1621 conn.send(l.address)
1622 new_conn = l.accept()
1623 conn.send(new_conn)
1624
1625 if self.TYPE == 'processes':
1626 l = socket.socket()
1627 l.bind(('localhost', 0))
1628 conn.send(l.getsockname())
1629 l.listen(1)
1630 new_conn, addr = l.accept()
1631 conn.send(new_conn)
1632
1633 conn.recv()
1634
1635 def _remote(self, conn):
1636 for (address, msg) in iter(conn.recv, None):
1637 client = self.connection.Client(address)
1638 client.send(msg.upper())
1639 client.close()
1640
1641 if self.TYPE == 'processes':
1642 address, msg = conn.recv()
1643 client = socket.socket()
1644 client.connect(address)
1645 client.sendall(msg.upper())
1646 client.close()
1647
1648 conn.close()
1649
1650 def test_pickling(self):
1651 try:
1652 multiprocessing.allow_connection_pickling()
1653 except ImportError:
1654 return
1655
1656 families = self.connection.families
1657
1658 lconn, lconn0 = self.Pipe()
1659 lp = self.Process(target=self._listener, args=(lconn0, families))
1660 lp.start()
1661 lconn0.close()
1662
1663 rconn, rconn0 = self.Pipe()
1664 rp = self.Process(target=self._remote, args=(rconn0,))
1665 rp.start()
1666 rconn0.close()
1667
1668 for fam in families:
1669 msg = ('This connection uses family %s' % fam).encode('ascii')
1670 address = lconn.recv()
1671 rconn.send((address, msg))
1672 new_conn = lconn.recv()
1673 self.assertEqual(new_conn.recv(), msg.upper())
1674
1675 rconn.send(None)
1676
1677 if self.TYPE == 'processes':
1678 msg = latin('This connection uses a normal socket')
1679 address = lconn.recv()
1680 rconn.send((address, msg))
1681 if hasattr(socket, 'fromfd'):
1682 new_conn = lconn.recv()
1683 self.assertEqual(new_conn.recv(100), msg.upper())
1684 else:
1685 # XXX On Windows with Py2.6 need to backport fromfd()
1686 discard = lconn.recv_bytes()
1687
1688 lconn.send(None)
1689
1690 rconn.close()
1691 lconn.close()
1692
1693 lp.join()
1694 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001695"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001696#
1697#
1698#
1699
1700class _TestHeap(BaseTestCase):
1701
1702 ALLOWED_TYPES = ('processes',)
1703
1704 def test_heap(self):
1705 iterations = 5000
1706 maxblocks = 50
1707 blocks = []
1708
1709 # create and destroy lots of blocks of different sizes
1710 for i in range(iterations):
1711 size = int(random.lognormvariate(0, 1) * 1000)
1712 b = multiprocessing.heap.BufferWrapper(size)
1713 blocks.append(b)
1714 if len(blocks) > maxblocks:
1715 i = random.randrange(maxblocks)
1716 del blocks[i]
1717
1718 # get the heap object
1719 heap = multiprocessing.heap.BufferWrapper._heap
1720
1721 # verify the state of the heap
1722 all = []
1723 occupied = 0
1724 for L in list(heap._len_to_seq.values()):
1725 for arena, start, stop in L:
1726 all.append((heap._arenas.index(arena), start, stop,
1727 stop-start, 'free'))
1728 for arena, start, stop in heap._allocated_blocks:
1729 all.append((heap._arenas.index(arena), start, stop,
1730 stop-start, 'occupied'))
1731 occupied += (stop-start)
1732
1733 all.sort()
1734
1735 for i in range(len(all)-1):
1736 (arena, start, stop) = all[i][:3]
1737 (narena, nstart, nstop) = all[i+1][:3]
1738 self.assertTrue((arena != narena and nstart == 0) or
1739 (stop == nstart))
1740
1741#
1742#
1743#
1744
Benjamin Petersone711caf2008-06-11 16:44:04 +00001745class _Foo(Structure):
1746 _fields_ = [
1747 ('x', c_int),
1748 ('y', c_double)
1749 ]
1750
1751class _TestSharedCTypes(BaseTestCase):
1752
1753 ALLOWED_TYPES = ('processes',)
1754
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001755 def setUp(self):
1756 if not HAS_SHAREDCTYPES:
1757 self.skipTest("requires multiprocessing.sharedctypes")
1758
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001759 @classmethod
1760 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001761 x.value *= 2
1762 y.value *= 2
1763 foo.x *= 2
1764 foo.y *= 2
1765 string.value *= 2
1766 for i in range(len(arr)):
1767 arr[i] *= 2
1768
1769 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001770 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001771 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001772 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001773 arr = self.Array('d', list(range(10)), lock=lock)
1774 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001775 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001776
1777 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1778 p.start()
1779 p.join()
1780
1781 self.assertEqual(x.value, 14)
1782 self.assertAlmostEqual(y.value, 2.0/3.0)
1783 self.assertEqual(foo.x, 6)
1784 self.assertAlmostEqual(foo.y, 4.0)
1785 for i in range(10):
1786 self.assertAlmostEqual(arr[i], i*2)
1787 self.assertEqual(string.value, latin('hellohello'))
1788
1789 def test_synchronize(self):
1790 self.test_sharedctypes(lock=True)
1791
1792 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001793 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001794 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001795 foo.x = 0
1796 foo.y = 0
1797 self.assertEqual(bar.x, 2)
1798 self.assertAlmostEqual(bar.y, 5.0)
1799
1800#
1801#
1802#
1803
1804class _TestFinalize(BaseTestCase):
1805
1806 ALLOWED_TYPES = ('processes',)
1807
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001808 @classmethod
1809 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001810 class Foo(object):
1811 pass
1812
1813 a = Foo()
1814 util.Finalize(a, conn.send, args=('a',))
1815 del a # triggers callback for a
1816
1817 b = Foo()
1818 close_b = util.Finalize(b, conn.send, args=('b',))
1819 close_b() # triggers callback for b
1820 close_b() # does nothing because callback has already been called
1821 del b # does nothing because callback has already been called
1822
1823 c = Foo()
1824 util.Finalize(c, conn.send, args=('c',))
1825
1826 d10 = Foo()
1827 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1828
1829 d01 = Foo()
1830 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1831 d02 = Foo()
1832 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1833 d03 = Foo()
1834 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1835
1836 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1837
1838 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1839
Ezio Melotti13925002011-03-16 11:05:33 +02001840 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001841 # garbage collecting locals
1842 util._exit_function()
1843 conn.close()
1844 os._exit(0)
1845
1846 def test_finalize(self):
1847 conn, child_conn = self.Pipe()
1848
1849 p = self.Process(target=self._test_finalize, args=(child_conn,))
1850 p.start()
1851 p.join()
1852
1853 result = [obj for obj in iter(conn.recv, 'STOP')]
1854 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1855
1856#
1857# Test that from ... import * works for each module
1858#
1859
1860class _TestImportStar(BaseTestCase):
1861
1862 ALLOWED_TYPES = ('processes',)
1863
1864 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001865 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001866 'multiprocessing', 'multiprocessing.connection',
1867 'multiprocessing.heap', 'multiprocessing.managers',
1868 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001869 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001870 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001871 ]
1872
1873 if c_int is not None:
1874 # This module requires _ctypes
1875 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001876
1877 for name in modules:
1878 __import__(name)
1879 mod = sys.modules[name]
1880
1881 for attr in getattr(mod, '__all__', ()):
1882 self.assertTrue(
1883 hasattr(mod, attr),
1884 '%r does not have attribute %r' % (mod, attr)
1885 )
1886
1887#
1888# Quick test that logging works -- does not test logging output
1889#
1890
1891class _TestLogging(BaseTestCase):
1892
1893 ALLOWED_TYPES = ('processes',)
1894
1895 def test_enable_logging(self):
1896 logger = multiprocessing.get_logger()
1897 logger.setLevel(util.SUBWARNING)
1898 self.assertTrue(logger is not None)
1899 logger.debug('this will not be printed')
1900 logger.info('nor will this')
1901 logger.setLevel(LOG_LEVEL)
1902
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001903 @classmethod
1904 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001905 logger = multiprocessing.get_logger()
1906 conn.send(logger.getEffectiveLevel())
1907
1908 def test_level(self):
1909 LEVEL1 = 32
1910 LEVEL2 = 37
1911
1912 logger = multiprocessing.get_logger()
1913 root_logger = logging.getLogger()
1914 root_level = root_logger.level
1915
1916 reader, writer = multiprocessing.Pipe(duplex=False)
1917
1918 logger.setLevel(LEVEL1)
1919 self.Process(target=self._test_level, args=(writer,)).start()
1920 self.assertEqual(LEVEL1, reader.recv())
1921
1922 logger.setLevel(logging.NOTSET)
1923 root_logger.setLevel(LEVEL2)
1924 self.Process(target=self._test_level, args=(writer,)).start()
1925 self.assertEqual(LEVEL2, reader.recv())
1926
1927 root_logger.setLevel(root_level)
1928 logger.setLevel(level=LOG_LEVEL)
1929
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001930
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001931# class _TestLoggingProcessName(BaseTestCase):
1932#
1933# def handle(self, record):
1934# assert record.processName == multiprocessing.current_process().name
1935# self.__handled = True
1936#
1937# def test_logging(self):
1938# handler = logging.Handler()
1939# handler.handle = self.handle
1940# self.__handled = False
1941# # Bypass getLogger() and side-effects
1942# logger = logging.getLoggerClass()(
1943# 'multiprocessing.test.TestLoggingProcessName')
1944# logger.addHandler(handler)
1945# logger.propagate = False
1946#
1947# logger.warn('foo')
1948# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001949
Benjamin Petersone711caf2008-06-11 16:44:04 +00001950#
Jesse Noller6214edd2009-01-19 16:23:53 +00001951# Test to verify handle verification, see issue 3321
1952#
1953
1954class TestInvalidHandle(unittest.TestCase):
1955
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001956 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001957 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02001958 conn = multiprocessing.connection.Connection(44977608)
1959 try:
1960 self.assertRaises((ValueError, IOError), conn.poll)
1961 finally:
1962 # Hack private attribute _handle to avoid printing an error
1963 # in conn.__del__
1964 conn._handle = None
1965 self.assertRaises((ValueError, IOError),
1966 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001967
Jesse Noller6214edd2009-01-19 16:23:53 +00001968#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001969# Functions used to create test cases from the base ones in this module
1970#
1971
1972def get_attributes(Source, names):
1973 d = {}
1974 for name in names:
1975 obj = getattr(Source, name)
1976 if type(obj) == type(get_attributes):
1977 obj = staticmethod(obj)
1978 d[name] = obj
1979 return d
1980
1981def create_test_cases(Mixin, type):
1982 result = {}
1983 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001984 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001985
1986 for name in list(glob.keys()):
1987 if name.startswith('_Test'):
1988 base = glob[name]
1989 if type in base.ALLOWED_TYPES:
1990 newname = 'With' + Type + name[1:]
1991 class Temp(base, unittest.TestCase, Mixin):
1992 pass
1993 result[newname] = Temp
1994 Temp.__name__ = newname
1995 Temp.__module__ = Mixin.__module__
1996 return result
1997
1998#
1999# Create test cases
2000#
2001
2002class ProcessesMixin(object):
2003 TYPE = 'processes'
2004 Process = multiprocessing.Process
2005 locals().update(get_attributes(multiprocessing, (
2006 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2007 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2008 'RawArray', 'current_process', 'active_children', 'Pipe',
2009 'connection', 'JoinableQueue'
2010 )))
2011
2012testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2013globals().update(testcases_processes)
2014
2015
2016class ManagerMixin(object):
2017 TYPE = 'manager'
2018 Process = multiprocessing.Process
2019 manager = object.__new__(multiprocessing.managers.SyncManager)
2020 locals().update(get_attributes(manager, (
2021 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2022 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2023 'Namespace', 'JoinableQueue'
2024 )))
2025
2026testcases_manager = create_test_cases(ManagerMixin, type='manager')
2027globals().update(testcases_manager)
2028
2029
2030class ThreadsMixin(object):
2031 TYPE = 'threads'
2032 Process = multiprocessing.dummy.Process
2033 locals().update(get_attributes(multiprocessing.dummy, (
2034 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2035 'Condition', 'Event', 'Value', 'Array', 'current_process',
2036 'active_children', 'Pipe', 'connection', 'dict', 'list',
2037 'Namespace', 'JoinableQueue'
2038 )))
2039
2040testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2041globals().update(testcases_threads)
2042
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002043class OtherTest(unittest.TestCase):
2044 # TODO: add more tests for deliver/answer challenge.
2045 def test_deliver_challenge_auth_failure(self):
2046 class _FakeConnection(object):
2047 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002048 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002049 def send_bytes(self, data):
2050 pass
2051 self.assertRaises(multiprocessing.AuthenticationError,
2052 multiprocessing.connection.deliver_challenge,
2053 _FakeConnection(), b'abc')
2054
2055 def test_answer_challenge_auth_failure(self):
2056 class _FakeConnection(object):
2057 def __init__(self):
2058 self.count = 0
2059 def recv_bytes(self, size):
2060 self.count += 1
2061 if self.count == 1:
2062 return multiprocessing.connection.CHALLENGE
2063 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002064 return b'something bogus'
2065 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002066 def send_bytes(self, data):
2067 pass
2068 self.assertRaises(multiprocessing.AuthenticationError,
2069 multiprocessing.connection.answer_challenge,
2070 _FakeConnection(), b'abc')
2071
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002072#
2073# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2074#
2075
2076def initializer(ns):
2077 ns.test += 1
2078
2079class TestInitializers(unittest.TestCase):
2080 def setUp(self):
2081 self.mgr = multiprocessing.Manager()
2082 self.ns = self.mgr.Namespace()
2083 self.ns.test = 0
2084
2085 def tearDown(self):
2086 self.mgr.shutdown()
2087
2088 def test_manager_initializer(self):
2089 m = multiprocessing.managers.SyncManager()
2090 self.assertRaises(TypeError, m.start, 1)
2091 m.start(initializer, (self.ns,))
2092 self.assertEqual(self.ns.test, 1)
2093 m.shutdown()
2094
2095 def test_pool_initializer(self):
2096 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2097 p = multiprocessing.Pool(1, initializer, (self.ns,))
2098 p.close()
2099 p.join()
2100 self.assertEqual(self.ns.test, 1)
2101
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002102#
2103# Issue 5155, 5313, 5331: Test process in processes
2104# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2105#
2106
2107def _ThisSubProcess(q):
2108 try:
2109 item = q.get(block=False)
2110 except pyqueue.Empty:
2111 pass
2112
2113def _TestProcess(q):
2114 queue = multiprocessing.Queue()
2115 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2116 subProc.start()
2117 subProc.join()
2118
2119def _afunc(x):
2120 return x*x
2121
2122def pool_in_process():
2123 pool = multiprocessing.Pool(processes=4)
2124 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2125
2126class _file_like(object):
2127 def __init__(self, delegate):
2128 self._delegate = delegate
2129 self._pid = None
2130
2131 @property
2132 def cache(self):
2133 pid = os.getpid()
2134 # There are no race conditions since fork keeps only the running thread
2135 if pid != self._pid:
2136 self._pid = pid
2137 self._cache = []
2138 return self._cache
2139
2140 def write(self, data):
2141 self.cache.append(data)
2142
2143 def flush(self):
2144 self._delegate.write(''.join(self.cache))
2145 self._cache = []
2146
2147class TestStdinBadfiledescriptor(unittest.TestCase):
2148
2149 def test_queue_in_process(self):
2150 queue = multiprocessing.Queue()
2151 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2152 proc.start()
2153 proc.join()
2154
2155 def test_pool_in_process(self):
2156 p = multiprocessing.Process(target=pool_in_process)
2157 p.start()
2158 p.join()
2159
2160 def test_flushing(self):
2161 sio = io.StringIO()
2162 flike = _file_like(sio)
2163 flike.write('foo')
2164 proc = multiprocessing.Process(target=lambda: flike.flush())
2165 flike.flush()
2166 assert sio.getvalue() == 'foo'
2167
2168testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2169 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002170
Benjamin Petersone711caf2008-06-11 16:44:04 +00002171#
2172#
2173#
2174
2175def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002176 if sys.platform.startswith("linux"):
2177 try:
2178 lock = multiprocessing.RLock()
2179 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002180 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002181
Benjamin Petersone711caf2008-06-11 16:44:04 +00002182 if run is None:
2183 from test.support import run_unittest as run
2184
2185 util.get_temp_dir() # creates temp directory for use by all processes
2186
2187 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2188
Benjamin Peterson41181742008-07-02 20:22:54 +00002189 ProcessesMixin.pool = multiprocessing.Pool(4)
2190 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2191 ManagerMixin.manager.__init__()
2192 ManagerMixin.manager.start()
2193 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002194
2195 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002196 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2197 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002198 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2199 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002200 )
2201
2202 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2203 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2204 run(suite)
2205
Benjamin Peterson41181742008-07-02 20:22:54 +00002206 ThreadsMixin.pool.terminate()
2207 ProcessesMixin.pool.terminate()
2208 ManagerMixin.pool.terminate()
2209 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002210
Benjamin Peterson41181742008-07-02 20:22:54 +00002211 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002212
2213def main():
2214 test_main(unittest.TextTestRunner(verbosity=2).run)
2215
2216if __name__ == '__main__':
2217 main()