blob: 85094cc5f4d8637cc2819f6059c96888cae27b12 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020074if WIN32:
75 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
76
77 def wait_for_handle(handle, timeout):
78 if timeout is None or timeout < 0.0:
79 timeout = INFINITE
80 else:
81 timeout = int(1000 * timeout)
82 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
83else:
84 from select import select
85 _select = util._eintr_retry(select)
86
87 def wait_for_handle(handle, timeout):
88 if timeout is not None and timeout < 0.0:
89 timeout = None
90 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +000091
Benjamin Petersone711caf2008-06-11 16:44:04 +000092#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000093# Some tests require ctypes
94#
95
96try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000097 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000098except ImportError:
99 Structure = object
100 c_int = c_double = None
101
102#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103# Creates a wrapper for a function which records the time it takes to finish
104#
105
106class TimingWrapper(object):
107
108 def __init__(self, func):
109 self.func = func
110 self.elapsed = None
111
112 def __call__(self, *args, **kwds):
113 t = time.time()
114 try:
115 return self.func(*args, **kwds)
116 finally:
117 self.elapsed = time.time() - t
118
119#
120# Base class for test cases
121#
122
123class BaseTestCase(object):
124
125 ALLOWED_TYPES = ('processes', 'manager', 'threads')
126
127 def assertTimingAlmostEqual(self, a, b):
128 if CHECK_TIMINGS:
129 self.assertAlmostEqual(a, b, 1)
130
131 def assertReturnsIfImplemented(self, value, func, *args):
132 try:
133 res = func(*args)
134 except NotImplementedError:
135 pass
136 else:
137 return self.assertEqual(value, res)
138
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000139 # For the sanity of Windows users, rather than crashing or freezing in
140 # multiple ways.
141 def __reduce__(self, *args):
142 raise NotImplementedError("shouldn't try to pickle a test case")
143
144 __reduce_ex__ = __reduce__
145
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146#
147# Return the value of a semaphore
148#
149
150def get_value(self):
151 try:
152 return self.get_value()
153 except AttributeError:
154 try:
155 return self._Semaphore__value
156 except AttributeError:
157 try:
158 return self._value
159 except AttributeError:
160 raise NotImplementedError
161
162#
163# Testcases
164#
165
166class _TestProcess(BaseTestCase):
167
168 ALLOWED_TYPES = ('processes', 'threads')
169
170 def test_current(self):
171 if self.TYPE == 'threads':
172 return
173
174 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000175 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000176
177 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000179 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 self.assertEqual(current.ident, os.getpid())
182 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000184 def test_daemon_argument(self):
185 if self.TYPE == "threads":
186 return
187
188 # By default uses the current process's daemon flag.
189 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000190 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000191 proc1 = self.Process(target=self._test, daemon=True)
192 self.assertTrue(proc1.daemon)
193 proc2 = self.Process(target=self._test, daemon=False)
194 self.assertFalse(proc2.daemon)
195
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 q.put(args)
200 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000202 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 p.start()
227
Ezio Melottib3aedd42010-11-20 19:04:17 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
Ezio Melottib3aedd42010-11-20 19:04:17 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238
239 p.join()
240
Ezio Melottib3aedd42010-11-20 19:04:17 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
251 return
252
253 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000254 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269
270 p.join()
271
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288
289 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291
292 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000294
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000295 @classmethod
296 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 from multiprocessing import forking
298 wconn.send(id)
299 if len(id) < 2:
300 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000301 p = cls.Process(
302 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000303 )
304 p.start()
305 p.join()
306
307 def test_recursion(self):
308 rconn, wconn = self.Pipe(duplex=False)
309 self._test_recursion(wconn, [])
310
311 time.sleep(DELTA)
312 result = []
313 while rconn.poll():
314 result.append(rconn.recv())
315
316 expected = [
317 [],
318 [0],
319 [0, 0],
320 [0, 1],
321 [1],
322 [1, 0],
323 [1, 1]
324 ]
325 self.assertEqual(result, expected)
326
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200327 @classmethod
328 def _test_sentinel(cls, event):
329 event.wait(10.0)
330
331 def test_sentinel(self):
332 if self.TYPE == "threads":
333 return
334 event = self.Event()
335 p = self.Process(target=self._test_sentinel, args=(event,))
336 with self.assertRaises(ValueError):
337 p.sentinel
338 p.start()
339 self.addCleanup(p.join)
340 sentinel = p.sentinel
341 self.assertIsInstance(sentinel, int)
342 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
343 event.set()
344 p.join()
345 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
346
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347#
348#
349#
350
351class _UpperCaser(multiprocessing.Process):
352
353 def __init__(self):
354 multiprocessing.Process.__init__(self)
355 self.child_conn, self.parent_conn = multiprocessing.Pipe()
356
357 def run(self):
358 self.parent_conn.close()
359 for s in iter(self.child_conn.recv, None):
360 self.child_conn.send(s.upper())
361 self.child_conn.close()
362
363 def submit(self, s):
364 assert type(s) is str
365 self.parent_conn.send(s)
366 return self.parent_conn.recv()
367
368 def stop(self):
369 self.parent_conn.send(None)
370 self.parent_conn.close()
371 self.child_conn.close()
372
373class _TestSubclassingProcess(BaseTestCase):
374
375 ALLOWED_TYPES = ('processes',)
376
377 def test_subclassing(self):
378 uppercaser = _UpperCaser()
379 uppercaser.start()
380 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
381 self.assertEqual(uppercaser.submit('world'), 'WORLD')
382 uppercaser.stop()
383 uppercaser.join()
384
385#
386#
387#
388
389def queue_empty(q):
390 if hasattr(q, 'empty'):
391 return q.empty()
392 else:
393 return q.qsize() == 0
394
395def queue_full(q, maxsize):
396 if hasattr(q, 'full'):
397 return q.full()
398 else:
399 return q.qsize() == maxsize
400
401
402class _TestQueue(BaseTestCase):
403
404
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000405 @classmethod
406 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000407 child_can_start.wait()
408 for i in range(6):
409 queue.get()
410 parent_can_continue.set()
411
412 def test_put(self):
413 MAXSIZE = 6
414 queue = self.Queue(maxsize=MAXSIZE)
415 child_can_start = self.Event()
416 parent_can_continue = self.Event()
417
418 proc = self.Process(
419 target=self._test_put,
420 args=(queue, child_can_start, parent_can_continue)
421 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000422 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 proc.start()
424
425 self.assertEqual(queue_empty(queue), True)
426 self.assertEqual(queue_full(queue, MAXSIZE), False)
427
428 queue.put(1)
429 queue.put(2, True)
430 queue.put(3, True, None)
431 queue.put(4, False)
432 queue.put(5, False, None)
433 queue.put_nowait(6)
434
435 # the values may be in buffer but not yet in pipe so sleep a bit
436 time.sleep(DELTA)
437
438 self.assertEqual(queue_empty(queue), False)
439 self.assertEqual(queue_full(queue, MAXSIZE), True)
440
441 put = TimingWrapper(queue.put)
442 put_nowait = TimingWrapper(queue.put_nowait)
443
444 self.assertRaises(pyqueue.Full, put, 7, False)
445 self.assertTimingAlmostEqual(put.elapsed, 0)
446
447 self.assertRaises(pyqueue.Full, put, 7, False, None)
448 self.assertTimingAlmostEqual(put.elapsed, 0)
449
450 self.assertRaises(pyqueue.Full, put_nowait, 7)
451 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
452
453 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
454 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
455
456 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
460 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
461
462 child_can_start.set()
463 parent_can_continue.wait()
464
465 self.assertEqual(queue_empty(queue), True)
466 self.assertEqual(queue_full(queue, MAXSIZE), False)
467
468 proc.join()
469
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000470 @classmethod
471 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000472 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000473 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000474 queue.put(2)
475 queue.put(3)
476 queue.put(4)
477 queue.put(5)
478 parent_can_continue.set()
479
480 def test_get(self):
481 queue = self.Queue()
482 child_can_start = self.Event()
483 parent_can_continue = self.Event()
484
485 proc = self.Process(
486 target=self._test_get,
487 args=(queue, child_can_start, parent_can_continue)
488 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000489 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000490 proc.start()
491
492 self.assertEqual(queue_empty(queue), True)
493
494 child_can_start.set()
495 parent_can_continue.wait()
496
497 time.sleep(DELTA)
498 self.assertEqual(queue_empty(queue), False)
499
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000500 # Hangs unexpectedly, remove for now
501 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000502 self.assertEqual(queue.get(True, None), 2)
503 self.assertEqual(queue.get(True), 3)
504 self.assertEqual(queue.get(timeout=1), 4)
505 self.assertEqual(queue.get_nowait(), 5)
506
507 self.assertEqual(queue_empty(queue), True)
508
509 get = TimingWrapper(queue.get)
510 get_nowait = TimingWrapper(queue.get_nowait)
511
512 self.assertRaises(pyqueue.Empty, get, False)
513 self.assertTimingAlmostEqual(get.elapsed, 0)
514
515 self.assertRaises(pyqueue.Empty, get, False, None)
516 self.assertTimingAlmostEqual(get.elapsed, 0)
517
518 self.assertRaises(pyqueue.Empty, get_nowait)
519 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
520
521 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
522 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
523
524 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
528 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
529
530 proc.join()
531
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000532 @classmethod
533 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 for i in range(10, 20):
535 queue.put(i)
536 # note that at this point the items may only be buffered, so the
537 # process cannot shutdown until the feeder thread has finished
538 # pushing items onto the pipe.
539
540 def test_fork(self):
541 # Old versions of Queue would fail to create a new feeder
542 # thread for a forked process if the original process had its
543 # own feeder thread. This test checks that this no longer
544 # happens.
545
546 queue = self.Queue()
547
548 # put items on queue so that main process starts a feeder thread
549 for i in range(10):
550 queue.put(i)
551
552 # wait to make sure thread starts before we fork a new process
553 time.sleep(DELTA)
554
555 # fork process
556 p = self.Process(target=self._test_fork, args=(queue,))
557 p.start()
558
559 # check that all expected items are in the queue
560 for i in range(20):
561 self.assertEqual(queue.get(), i)
562 self.assertRaises(pyqueue.Empty, queue.get, False)
563
564 p.join()
565
566 def test_qsize(self):
567 q = self.Queue()
568 try:
569 self.assertEqual(q.qsize(), 0)
570 except NotImplementedError:
571 return
572 q.put(1)
573 self.assertEqual(q.qsize(), 1)
574 q.put(5)
575 self.assertEqual(q.qsize(), 2)
576 q.get()
577 self.assertEqual(q.qsize(), 1)
578 q.get()
579 self.assertEqual(q.qsize(), 0)
580
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000581 @classmethod
582 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000583 for obj in iter(q.get, None):
584 time.sleep(DELTA)
585 q.task_done()
586
587 def test_task_done(self):
588 queue = self.JoinableQueue()
589
590 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000591 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592
593 workers = [self.Process(target=self._test_task_done, args=(queue,))
594 for i in range(4)]
595
596 for p in workers:
597 p.start()
598
599 for i in range(10):
600 queue.put(i)
601
602 queue.join()
603
604 for p in workers:
605 queue.put(None)
606
607 for p in workers:
608 p.join()
609
610#
611#
612#
613
614class _TestLock(BaseTestCase):
615
616 def test_lock(self):
617 lock = self.Lock()
618 self.assertEqual(lock.acquire(), True)
619 self.assertEqual(lock.acquire(False), False)
620 self.assertEqual(lock.release(), None)
621 self.assertRaises((ValueError, threading.ThreadError), lock.release)
622
623 def test_rlock(self):
624 lock = self.RLock()
625 self.assertEqual(lock.acquire(), True)
626 self.assertEqual(lock.acquire(), True)
627 self.assertEqual(lock.acquire(), True)
628 self.assertEqual(lock.release(), None)
629 self.assertEqual(lock.release(), None)
630 self.assertEqual(lock.release(), None)
631 self.assertRaises((AssertionError, RuntimeError), lock.release)
632
Jesse Nollerf8d00852009-03-31 03:25:07 +0000633 def test_lock_context(self):
634 with self.Lock():
635 pass
636
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637
638class _TestSemaphore(BaseTestCase):
639
640 def _test_semaphore(self, sem):
641 self.assertReturnsIfImplemented(2, get_value, sem)
642 self.assertEqual(sem.acquire(), True)
643 self.assertReturnsIfImplemented(1, get_value, sem)
644 self.assertEqual(sem.acquire(), True)
645 self.assertReturnsIfImplemented(0, get_value, sem)
646 self.assertEqual(sem.acquire(False), False)
647 self.assertReturnsIfImplemented(0, get_value, sem)
648 self.assertEqual(sem.release(), None)
649 self.assertReturnsIfImplemented(1, get_value, sem)
650 self.assertEqual(sem.release(), None)
651 self.assertReturnsIfImplemented(2, get_value, sem)
652
653 def test_semaphore(self):
654 sem = self.Semaphore(2)
655 self._test_semaphore(sem)
656 self.assertEqual(sem.release(), None)
657 self.assertReturnsIfImplemented(3, get_value, sem)
658 self.assertEqual(sem.release(), None)
659 self.assertReturnsIfImplemented(4, get_value, sem)
660
661 def test_bounded_semaphore(self):
662 sem = self.BoundedSemaphore(2)
663 self._test_semaphore(sem)
664 # Currently fails on OS/X
665 #if HAVE_GETVALUE:
666 # self.assertRaises(ValueError, sem.release)
667 # self.assertReturnsIfImplemented(2, get_value, sem)
668
669 def test_timeout(self):
670 if self.TYPE != 'processes':
671 return
672
673 sem = self.Semaphore(0)
674 acquire = TimingWrapper(sem.acquire)
675
676 self.assertEqual(acquire(False), False)
677 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
678
679 self.assertEqual(acquire(False, None), False)
680 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
681
682 self.assertEqual(acquire(False, TIMEOUT1), False)
683 self.assertTimingAlmostEqual(acquire.elapsed, 0)
684
685 self.assertEqual(acquire(True, TIMEOUT2), False)
686 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
687
688 self.assertEqual(acquire(timeout=TIMEOUT3), False)
689 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
690
691
692class _TestCondition(BaseTestCase):
693
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000694 @classmethod
695 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000696 cond.acquire()
697 sleeping.release()
698 cond.wait(timeout)
699 woken.release()
700 cond.release()
701
702 def check_invariant(self, cond):
703 # this is only supposed to succeed when there are no sleepers
704 if self.TYPE == 'processes':
705 try:
706 sleepers = (cond._sleeping_count.get_value() -
707 cond._woken_count.get_value())
708 self.assertEqual(sleepers, 0)
709 self.assertEqual(cond._wait_semaphore.get_value(), 0)
710 except NotImplementedError:
711 pass
712
713 def test_notify(self):
714 cond = self.Condition()
715 sleeping = self.Semaphore(0)
716 woken = self.Semaphore(0)
717
718 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000719 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000720 p.start()
721
722 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000723 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000724 p.start()
725
726 # wait for both children to start sleeping
727 sleeping.acquire()
728 sleeping.acquire()
729
730 # check no process/thread has woken up
731 time.sleep(DELTA)
732 self.assertReturnsIfImplemented(0, get_value, woken)
733
734 # wake up one process/thread
735 cond.acquire()
736 cond.notify()
737 cond.release()
738
739 # check one process/thread has woken up
740 time.sleep(DELTA)
741 self.assertReturnsIfImplemented(1, get_value, woken)
742
743 # wake up another
744 cond.acquire()
745 cond.notify()
746 cond.release()
747
748 # check other has woken up
749 time.sleep(DELTA)
750 self.assertReturnsIfImplemented(2, get_value, woken)
751
752 # check state is not mucked up
753 self.check_invariant(cond)
754 p.join()
755
756 def test_notify_all(self):
757 cond = self.Condition()
758 sleeping = self.Semaphore(0)
759 woken = self.Semaphore(0)
760
761 # start some threads/processes which will timeout
762 for i in range(3):
763 p = self.Process(target=self.f,
764 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000765 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000766 p.start()
767
768 t = threading.Thread(target=self.f,
769 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000770 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000771 t.start()
772
773 # wait for them all to sleep
774 for i in range(6):
775 sleeping.acquire()
776
777 # check they have all timed out
778 for i in range(6):
779 woken.acquire()
780 self.assertReturnsIfImplemented(0, get_value, woken)
781
782 # check state is not mucked up
783 self.check_invariant(cond)
784
785 # start some more threads/processes
786 for i in range(3):
787 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000788 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789 p.start()
790
791 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000792 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 t.start()
794
795 # wait for them to all sleep
796 for i in range(6):
797 sleeping.acquire()
798
799 # check no process/thread has woken up
800 time.sleep(DELTA)
801 self.assertReturnsIfImplemented(0, get_value, woken)
802
803 # wake them all up
804 cond.acquire()
805 cond.notify_all()
806 cond.release()
807
808 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200809 for i in range(10):
810 try:
811 if get_value(woken) == 6:
812 break
813 except NotImplementedError:
814 break
815 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 self.assertReturnsIfImplemented(6, get_value, woken)
817
818 # check state is not mucked up
819 self.check_invariant(cond)
820
821 def test_timeout(self):
822 cond = self.Condition()
823 wait = TimingWrapper(cond.wait)
824 cond.acquire()
825 res = wait(TIMEOUT1)
826 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000827 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000828 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
829
830
831class _TestEvent(BaseTestCase):
832
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000833 @classmethod
834 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 time.sleep(TIMEOUT2)
836 event.set()
837
838 def test_event(self):
839 event = self.Event()
840 wait = TimingWrapper(event.wait)
841
Ezio Melotti13925002011-03-16 11:05:33 +0200842 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000843 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000844 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845
Benjamin Peterson965ce872009-04-05 21:24:58 +0000846 # Removed, threading.Event.wait() will return the value of the __flag
847 # instead of None. API Shear with the semaphore backed mp.Event
848 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000850 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000851 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
852
853 event.set()
854
855 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000856 self.assertEqual(event.is_set(), True)
857 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000859 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000860 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
861 # self.assertEqual(event.is_set(), True)
862
863 event.clear()
864
865 #self.assertEqual(event.is_set(), False)
866
867 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000868 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869
870#
871#
872#
873
874class _TestValue(BaseTestCase):
875
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000876 ALLOWED_TYPES = ('processes',)
877
Benjamin Petersone711caf2008-06-11 16:44:04 +0000878 codes_values = [
879 ('i', 4343, 24234),
880 ('d', 3.625, -4.25),
881 ('h', -232, 234),
882 ('c', latin('x'), latin('y'))
883 ]
884
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000885 def setUp(self):
886 if not HAS_SHAREDCTYPES:
887 self.skipTest("requires multiprocessing.sharedctypes")
888
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000889 @classmethod
890 def _test(cls, values):
891 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 sv.value = cv[2]
893
894
895 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 if raw:
897 values = [self.RawValue(code, value)
898 for code, value, _ in self.codes_values]
899 else:
900 values = [self.Value(code, value)
901 for code, value, _ in self.codes_values]
902
903 for sv, cv in zip(values, self.codes_values):
904 self.assertEqual(sv.value, cv[1])
905
906 proc = self.Process(target=self._test, args=(values,))
907 proc.start()
908 proc.join()
909
910 for sv, cv in zip(values, self.codes_values):
911 self.assertEqual(sv.value, cv[2])
912
913 def test_rawvalue(self):
914 self.test_value(raw=True)
915
916 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 val1 = self.Value('i', 5)
918 lock1 = val1.get_lock()
919 obj1 = val1.get_obj()
920
921 val2 = self.Value('i', 5, lock=None)
922 lock2 = val2.get_lock()
923 obj2 = val2.get_obj()
924
925 lock = self.Lock()
926 val3 = self.Value('i', 5, lock=lock)
927 lock3 = val3.get_lock()
928 obj3 = val3.get_obj()
929 self.assertEqual(lock, lock3)
930
Jesse Nollerb0516a62009-01-18 03:11:38 +0000931 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000932 self.assertFalse(hasattr(arr4, 'get_lock'))
933 self.assertFalse(hasattr(arr4, 'get_obj'))
934
Jesse Nollerb0516a62009-01-18 03:11:38 +0000935 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
936
937 arr5 = self.RawValue('i', 5)
938 self.assertFalse(hasattr(arr5, 'get_lock'))
939 self.assertFalse(hasattr(arr5, 'get_obj'))
940
Benjamin Petersone711caf2008-06-11 16:44:04 +0000941
942class _TestArray(BaseTestCase):
943
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000944 ALLOWED_TYPES = ('processes',)
945
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000946 @classmethod
947 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000948 for i in range(1, len(seq)):
949 seq[i] += seq[i-1]
950
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000951 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000953 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
954 if raw:
955 arr = self.RawArray('i', seq)
956 else:
957 arr = self.Array('i', seq)
958
959 self.assertEqual(len(arr), len(seq))
960 self.assertEqual(arr[3], seq[3])
961 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
962
963 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
964
965 self.assertEqual(list(arr[:]), seq)
966
967 self.f(seq)
968
969 p = self.Process(target=self.f, args=(arr,))
970 p.start()
971 p.join()
972
973 self.assertEqual(list(arr[:]), seq)
974
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000975 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000976 def test_array_from_size(self):
977 size = 10
978 # Test for zeroing (see issue #11675).
979 # The repetition below strengthens the test by increasing the chances
980 # of previously allocated non-zero memory being used for the new array
981 # on the 2nd and 3rd loops.
982 for _ in range(3):
983 arr = self.Array('i', size)
984 self.assertEqual(len(arr), size)
985 self.assertEqual(list(arr), [0] * size)
986 arr[:] = range(10)
987 self.assertEqual(list(arr), list(range(10)))
988 del arr
989
990 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 def test_rawarray(self):
992 self.test_array(raw=True)
993
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000994 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996 arr1 = self.Array('i', list(range(10)))
997 lock1 = arr1.get_lock()
998 obj1 = arr1.get_obj()
999
1000 arr2 = self.Array('i', list(range(10)), lock=None)
1001 lock2 = arr2.get_lock()
1002 obj2 = arr2.get_obj()
1003
1004 lock = self.Lock()
1005 arr3 = self.Array('i', list(range(10)), lock=lock)
1006 lock3 = arr3.get_lock()
1007 obj3 = arr3.get_obj()
1008 self.assertEqual(lock, lock3)
1009
Jesse Nollerb0516a62009-01-18 03:11:38 +00001010 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011 self.assertFalse(hasattr(arr4, 'get_lock'))
1012 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001013 self.assertRaises(AttributeError,
1014 self.Array, 'i', range(10), lock='notalock')
1015
1016 arr5 = self.RawArray('i', range(10))
1017 self.assertFalse(hasattr(arr5, 'get_lock'))
1018 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001019
1020#
1021#
1022#
1023
1024class _TestContainers(BaseTestCase):
1025
1026 ALLOWED_TYPES = ('manager',)
1027
1028 def test_list(self):
1029 a = self.list(list(range(10)))
1030 self.assertEqual(a[:], list(range(10)))
1031
1032 b = self.list()
1033 self.assertEqual(b[:], [])
1034
1035 b.extend(list(range(5)))
1036 self.assertEqual(b[:], list(range(5)))
1037
1038 self.assertEqual(b[2], 2)
1039 self.assertEqual(b[2:10], [2,3,4])
1040
1041 b *= 2
1042 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1043
1044 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1045
1046 self.assertEqual(a[:], list(range(10)))
1047
1048 d = [a, b]
1049 e = self.list(d)
1050 self.assertEqual(
1051 e[:],
1052 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1053 )
1054
1055 f = self.list([a])
1056 a.append('hello')
1057 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1058
1059 def test_dict(self):
1060 d = self.dict()
1061 indices = list(range(65, 70))
1062 for i in indices:
1063 d[i] = chr(i)
1064 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1065 self.assertEqual(sorted(d.keys()), indices)
1066 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1067 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1068
1069 def test_namespace(self):
1070 n = self.Namespace()
1071 n.name = 'Bob'
1072 n.job = 'Builder'
1073 n._hidden = 'hidden'
1074 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1075 del n.job
1076 self.assertEqual(str(n), "Namespace(name='Bob')")
1077 self.assertTrue(hasattr(n, 'name'))
1078 self.assertTrue(not hasattr(n, 'job'))
1079
1080#
1081#
1082#
1083
1084def sqr(x, wait=0.0):
1085 time.sleep(wait)
1086 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001087
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088class _TestPool(BaseTestCase):
1089
1090 def test_apply(self):
1091 papply = self.pool.apply
1092 self.assertEqual(papply(sqr, (5,)), sqr(5))
1093 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1094
1095 def test_map(self):
1096 pmap = self.pool.map
1097 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1098 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1099 list(map(sqr, list(range(100)))))
1100
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001101 def test_map_chunksize(self):
1102 try:
1103 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1104 except multiprocessing.TimeoutError:
1105 self.fail("pool.map_async with chunksize stalled on null list")
1106
Benjamin Petersone711caf2008-06-11 16:44:04 +00001107 def test_async(self):
1108 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1109 get = TimingWrapper(res.get)
1110 self.assertEqual(get(), 49)
1111 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1112
1113 def test_async_timeout(self):
1114 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1115 get = TimingWrapper(res.get)
1116 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1117 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1118
1119 def test_imap(self):
1120 it = self.pool.imap(sqr, list(range(10)))
1121 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1122
1123 it = self.pool.imap(sqr, list(range(10)))
1124 for i in range(10):
1125 self.assertEqual(next(it), i*i)
1126 self.assertRaises(StopIteration, it.__next__)
1127
1128 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1129 for i in range(1000):
1130 self.assertEqual(next(it), i*i)
1131 self.assertRaises(StopIteration, it.__next__)
1132
1133 def test_imap_unordered(self):
1134 it = self.pool.imap_unordered(sqr, list(range(1000)))
1135 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1136
1137 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1138 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1139
1140 def test_make_pool(self):
1141 p = multiprocessing.Pool(3)
1142 self.assertEqual(3, len(p._pool))
1143 p.close()
1144 p.join()
1145
1146 def test_terminate(self):
1147 if self.TYPE == 'manager':
1148 # On Unix a forked process increfs each shared object to
1149 # which its parent process held a reference. If the
1150 # forked process gets terminated then there is likely to
1151 # be a reference leak. So to prevent
1152 # _TestZZZNumberOfObjects from failing we skip this test
1153 # when using a manager.
1154 return
1155
1156 result = self.pool.map_async(
1157 time.sleep, [0.1 for i in range(10000)], chunksize=1
1158 )
1159 self.pool.terminate()
1160 join = TimingWrapper(self.pool.join)
1161 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001162 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001163
Ask Solem2afcbf22010-11-09 20:55:52 +00001164def raising():
1165 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001166
Ask Solem2afcbf22010-11-09 20:55:52 +00001167def unpickleable_result():
1168 return lambda: 42
1169
1170class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001171 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001172
1173 def test_async_error_callback(self):
1174 p = multiprocessing.Pool(2)
1175
1176 scratchpad = [None]
1177 def errback(exc):
1178 scratchpad[0] = exc
1179
1180 res = p.apply_async(raising, error_callback=errback)
1181 self.assertRaises(KeyError, res.get)
1182 self.assertTrue(scratchpad[0])
1183 self.assertIsInstance(scratchpad[0], KeyError)
1184
1185 p.close()
1186 p.join()
1187
1188 def test_unpickleable_result(self):
1189 from multiprocessing.pool import MaybeEncodingError
1190 p = multiprocessing.Pool(2)
1191
1192 # Make sure we don't lose pool processes because of encoding errors.
1193 for iteration in range(20):
1194
1195 scratchpad = [None]
1196 def errback(exc):
1197 scratchpad[0] = exc
1198
1199 res = p.apply_async(unpickleable_result, error_callback=errback)
1200 self.assertRaises(MaybeEncodingError, res.get)
1201 wrapped = scratchpad[0]
1202 self.assertTrue(wrapped)
1203 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1204 self.assertIsNotNone(wrapped.exc)
1205 self.assertIsNotNone(wrapped.value)
1206
1207 p.close()
1208 p.join()
1209
1210class _TestPoolWorkerLifetime(BaseTestCase):
1211 ALLOWED_TYPES = ('processes', )
1212
Jesse Noller1f0b6582010-01-27 03:36:01 +00001213 def test_pool_worker_lifetime(self):
1214 p = multiprocessing.Pool(3, maxtasksperchild=10)
1215 self.assertEqual(3, len(p._pool))
1216 origworkerpids = [w.pid for w in p._pool]
1217 # Run many tasks so each worker gets replaced (hopefully)
1218 results = []
1219 for i in range(100):
1220 results.append(p.apply_async(sqr, (i, )))
1221 # Fetch the results and verify we got the right answers,
1222 # also ensuring all the tasks have completed.
1223 for (j, res) in enumerate(results):
1224 self.assertEqual(res.get(), sqr(j))
1225 # Refill the pool
1226 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001227 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001228 # (countdown * DELTA = 5 seconds max startup process time)
1229 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001230 while countdown and not all(w.is_alive() for w in p._pool):
1231 countdown -= 1
1232 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001233 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001234 # All pids should be assigned. See issue #7805.
1235 self.assertNotIn(None, origworkerpids)
1236 self.assertNotIn(None, finalworkerpids)
1237 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001238 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1239 p.close()
1240 p.join()
1241
Benjamin Petersone711caf2008-06-11 16:44:04 +00001242#
1243# Test that manager has expected number of shared objects left
1244#
1245
1246class _TestZZZNumberOfObjects(BaseTestCase):
1247 # Because test cases are sorted alphabetically, this one will get
1248 # run after all the other tests for the manager. It tests that
1249 # there have been no "reference leaks" for the manager's shared
1250 # objects. Note the comment in _TestPool.test_terminate().
1251 ALLOWED_TYPES = ('manager',)
1252
1253 def test_number_of_objects(self):
1254 EXPECTED_NUMBER = 1 # the pool object is still alive
1255 multiprocessing.active_children() # discard dead process objs
1256 gc.collect() # do garbage collection
1257 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001258 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001259 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001260 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001261 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001262
1263 self.assertEqual(refs, EXPECTED_NUMBER)
1264
1265#
1266# Test of creating a customized manager class
1267#
1268
1269from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1270
1271class FooBar(object):
1272 def f(self):
1273 return 'f()'
1274 def g(self):
1275 raise ValueError
1276 def _h(self):
1277 return '_h()'
1278
1279def baz():
1280 for i in range(10):
1281 yield i*i
1282
1283class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001284 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001285 def __iter__(self):
1286 return self
1287 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001288 return self._callmethod('__next__')
1289
1290class MyManager(BaseManager):
1291 pass
1292
1293MyManager.register('Foo', callable=FooBar)
1294MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1295MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1296
1297
1298class _TestMyManager(BaseTestCase):
1299
1300 ALLOWED_TYPES = ('manager',)
1301
1302 def test_mymanager(self):
1303 manager = MyManager()
1304 manager.start()
1305
1306 foo = manager.Foo()
1307 bar = manager.Bar()
1308 baz = manager.baz()
1309
1310 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1311 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1312
1313 self.assertEqual(foo_methods, ['f', 'g'])
1314 self.assertEqual(bar_methods, ['f', '_h'])
1315
1316 self.assertEqual(foo.f(), 'f()')
1317 self.assertRaises(ValueError, foo.g)
1318 self.assertEqual(foo._callmethod('f'), 'f()')
1319 self.assertRaises(RemoteError, foo._callmethod, '_h')
1320
1321 self.assertEqual(bar.f(), 'f()')
1322 self.assertEqual(bar._h(), '_h()')
1323 self.assertEqual(bar._callmethod('f'), 'f()')
1324 self.assertEqual(bar._callmethod('_h'), '_h()')
1325
1326 self.assertEqual(list(baz), [i*i for i in range(10)])
1327
1328 manager.shutdown()
1329
1330#
1331# Test of connecting to a remote server and using xmlrpclib for serialization
1332#
1333
1334_queue = pyqueue.Queue()
1335def get_queue():
1336 return _queue
1337
1338class QueueManager(BaseManager):
1339 '''manager class used by server process'''
1340QueueManager.register('get_queue', callable=get_queue)
1341
1342class QueueManager2(BaseManager):
1343 '''manager class which specifies the same interface as QueueManager'''
1344QueueManager2.register('get_queue')
1345
1346
1347SERIALIZER = 'xmlrpclib'
1348
1349class _TestRemoteManager(BaseTestCase):
1350
1351 ALLOWED_TYPES = ('manager',)
1352
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001353 @classmethod
1354 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001355 manager = QueueManager2(
1356 address=address, authkey=authkey, serializer=SERIALIZER
1357 )
1358 manager.connect()
1359 queue = manager.get_queue()
1360 queue.put(('hello world', None, True, 2.25))
1361
1362 def test_remote(self):
1363 authkey = os.urandom(32)
1364
1365 manager = QueueManager(
1366 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1367 )
1368 manager.start()
1369
1370 p = self.Process(target=self._putter, args=(manager.address, authkey))
1371 p.start()
1372
1373 manager2 = QueueManager2(
1374 address=manager.address, authkey=authkey, serializer=SERIALIZER
1375 )
1376 manager2.connect()
1377 queue = manager2.get_queue()
1378
1379 # Note that xmlrpclib will deserialize object as a list not a tuple
1380 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1381
1382 # Because we are using xmlrpclib for serialization instead of
1383 # pickle this will cause a serialization error.
1384 self.assertRaises(Exception, queue.put, time.sleep)
1385
1386 # Make queue finalizer run before the server is stopped
1387 del queue
1388 manager.shutdown()
1389
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001390class _TestManagerRestart(BaseTestCase):
1391
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001392 @classmethod
1393 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001394 manager = QueueManager(
1395 address=address, authkey=authkey, serializer=SERIALIZER)
1396 manager.connect()
1397 queue = manager.get_queue()
1398 queue.put('hello world')
1399
1400 def test_rapid_restart(self):
1401 authkey = os.urandom(32)
1402 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001403 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001404 srvr = manager.get_server()
1405 addr = srvr.address
1406 # Close the connection.Listener socket which gets opened as a part
1407 # of manager.get_server(). It's not needed for the test.
1408 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001409 manager.start()
1410
1411 p = self.Process(target=self._putter, args=(manager.address, authkey))
1412 p.start()
1413 queue = manager.get_queue()
1414 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001415 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001416 manager.shutdown()
1417 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001418 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001419 try:
1420 manager.start()
1421 except IOError as e:
1422 if e.errno != errno.EADDRINUSE:
1423 raise
1424 # Retry after some time, in case the old socket was lingering
1425 # (sporadic failure on buildbots)
1426 time.sleep(1.0)
1427 manager = QueueManager(
1428 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001429 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001430
Benjamin Petersone711caf2008-06-11 16:44:04 +00001431#
1432#
1433#
1434
1435SENTINEL = latin('')
1436
1437class _TestConnection(BaseTestCase):
1438
1439 ALLOWED_TYPES = ('processes', 'threads')
1440
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001441 @classmethod
1442 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001443 for msg in iter(conn.recv_bytes, SENTINEL):
1444 conn.send_bytes(msg)
1445 conn.close()
1446
1447 def test_connection(self):
1448 conn, child_conn = self.Pipe()
1449
1450 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001451 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001452 p.start()
1453
1454 seq = [1, 2.25, None]
1455 msg = latin('hello world')
1456 longmsg = msg * 10
1457 arr = array.array('i', list(range(4)))
1458
1459 if self.TYPE == 'processes':
1460 self.assertEqual(type(conn.fileno()), int)
1461
1462 self.assertEqual(conn.send(seq), None)
1463 self.assertEqual(conn.recv(), seq)
1464
1465 self.assertEqual(conn.send_bytes(msg), None)
1466 self.assertEqual(conn.recv_bytes(), msg)
1467
1468 if self.TYPE == 'processes':
1469 buffer = array.array('i', [0]*10)
1470 expected = list(arr) + [0] * (10 - len(arr))
1471 self.assertEqual(conn.send_bytes(arr), None)
1472 self.assertEqual(conn.recv_bytes_into(buffer),
1473 len(arr) * buffer.itemsize)
1474 self.assertEqual(list(buffer), expected)
1475
1476 buffer = array.array('i', [0]*10)
1477 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1478 self.assertEqual(conn.send_bytes(arr), None)
1479 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1480 len(arr) * buffer.itemsize)
1481 self.assertEqual(list(buffer), expected)
1482
1483 buffer = bytearray(latin(' ' * 40))
1484 self.assertEqual(conn.send_bytes(longmsg), None)
1485 try:
1486 res = conn.recv_bytes_into(buffer)
1487 except multiprocessing.BufferTooShort as e:
1488 self.assertEqual(e.args, (longmsg,))
1489 else:
1490 self.fail('expected BufferTooShort, got %s' % res)
1491
1492 poll = TimingWrapper(conn.poll)
1493
1494 self.assertEqual(poll(), False)
1495 self.assertTimingAlmostEqual(poll.elapsed, 0)
1496
1497 self.assertEqual(poll(TIMEOUT1), False)
1498 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1499
1500 conn.send(None)
1501
1502 self.assertEqual(poll(TIMEOUT1), True)
1503 self.assertTimingAlmostEqual(poll.elapsed, 0)
1504
1505 self.assertEqual(conn.recv(), None)
1506
1507 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1508 conn.send_bytes(really_big_msg)
1509 self.assertEqual(conn.recv_bytes(), really_big_msg)
1510
1511 conn.send_bytes(SENTINEL) # tell child to quit
1512 child_conn.close()
1513
1514 if self.TYPE == 'processes':
1515 self.assertEqual(conn.readable, True)
1516 self.assertEqual(conn.writable, True)
1517 self.assertRaises(EOFError, conn.recv)
1518 self.assertRaises(EOFError, conn.recv_bytes)
1519
1520 p.join()
1521
1522 def test_duplex_false(self):
1523 reader, writer = self.Pipe(duplex=False)
1524 self.assertEqual(writer.send(1), None)
1525 self.assertEqual(reader.recv(), 1)
1526 if self.TYPE == 'processes':
1527 self.assertEqual(reader.readable, True)
1528 self.assertEqual(reader.writable, False)
1529 self.assertEqual(writer.readable, False)
1530 self.assertEqual(writer.writable, True)
1531 self.assertRaises(IOError, reader.send, 2)
1532 self.assertRaises(IOError, writer.recv)
1533 self.assertRaises(IOError, writer.poll)
1534
1535 def test_spawn_close(self):
1536 # We test that a pipe connection can be closed by parent
1537 # process immediately after child is spawned. On Windows this
1538 # would have sometimes failed on old versions because
1539 # child_conn would be closed before the child got a chance to
1540 # duplicate it.
1541 conn, child_conn = self.Pipe()
1542
1543 p = self.Process(target=self._echo, args=(child_conn,))
1544 p.start()
1545 child_conn.close() # this might complete before child initializes
1546
1547 msg = latin('hello')
1548 conn.send_bytes(msg)
1549 self.assertEqual(conn.recv_bytes(), msg)
1550
1551 conn.send_bytes(SENTINEL)
1552 conn.close()
1553 p.join()
1554
1555 def test_sendbytes(self):
1556 if self.TYPE != 'processes':
1557 return
1558
1559 msg = latin('abcdefghijklmnopqrstuvwxyz')
1560 a, b = self.Pipe()
1561
1562 a.send_bytes(msg)
1563 self.assertEqual(b.recv_bytes(), msg)
1564
1565 a.send_bytes(msg, 5)
1566 self.assertEqual(b.recv_bytes(), msg[5:])
1567
1568 a.send_bytes(msg, 7, 8)
1569 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1570
1571 a.send_bytes(msg, 26)
1572 self.assertEqual(b.recv_bytes(), latin(''))
1573
1574 a.send_bytes(msg, 26, 0)
1575 self.assertEqual(b.recv_bytes(), latin(''))
1576
1577 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1578
1579 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1580
1581 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1582
1583 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1584
1585 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1586
Benjamin Petersone711caf2008-06-11 16:44:04 +00001587class _TestListenerClient(BaseTestCase):
1588
1589 ALLOWED_TYPES = ('processes', 'threads')
1590
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001591 @classmethod
1592 def _test(cls, address):
1593 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001594 conn.send('hello')
1595 conn.close()
1596
1597 def test_listener_client(self):
1598 for family in self.connection.families:
1599 l = self.connection.Listener(family=family)
1600 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001601 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001602 p.start()
1603 conn = l.accept()
1604 self.assertEqual(conn.recv(), 'hello')
1605 p.join()
1606 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001607#
1608# Test of sending connection and socket objects between processes
1609#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001610"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001611class _TestPicklingConnections(BaseTestCase):
1612
1613 ALLOWED_TYPES = ('processes',)
1614
1615 def _listener(self, conn, families):
1616 for fam in families:
1617 l = self.connection.Listener(family=fam)
1618 conn.send(l.address)
1619 new_conn = l.accept()
1620 conn.send(new_conn)
1621
1622 if self.TYPE == 'processes':
1623 l = socket.socket()
1624 l.bind(('localhost', 0))
1625 conn.send(l.getsockname())
1626 l.listen(1)
1627 new_conn, addr = l.accept()
1628 conn.send(new_conn)
1629
1630 conn.recv()
1631
1632 def _remote(self, conn):
1633 for (address, msg) in iter(conn.recv, None):
1634 client = self.connection.Client(address)
1635 client.send(msg.upper())
1636 client.close()
1637
1638 if self.TYPE == 'processes':
1639 address, msg = conn.recv()
1640 client = socket.socket()
1641 client.connect(address)
1642 client.sendall(msg.upper())
1643 client.close()
1644
1645 conn.close()
1646
1647 def test_pickling(self):
1648 try:
1649 multiprocessing.allow_connection_pickling()
1650 except ImportError:
1651 return
1652
1653 families = self.connection.families
1654
1655 lconn, lconn0 = self.Pipe()
1656 lp = self.Process(target=self._listener, args=(lconn0, families))
1657 lp.start()
1658 lconn0.close()
1659
1660 rconn, rconn0 = self.Pipe()
1661 rp = self.Process(target=self._remote, args=(rconn0,))
1662 rp.start()
1663 rconn0.close()
1664
1665 for fam in families:
1666 msg = ('This connection uses family %s' % fam).encode('ascii')
1667 address = lconn.recv()
1668 rconn.send((address, msg))
1669 new_conn = lconn.recv()
1670 self.assertEqual(new_conn.recv(), msg.upper())
1671
1672 rconn.send(None)
1673
1674 if self.TYPE == 'processes':
1675 msg = latin('This connection uses a normal socket')
1676 address = lconn.recv()
1677 rconn.send((address, msg))
1678 if hasattr(socket, 'fromfd'):
1679 new_conn = lconn.recv()
1680 self.assertEqual(new_conn.recv(100), msg.upper())
1681 else:
1682 # XXX On Windows with Py2.6 need to backport fromfd()
1683 discard = lconn.recv_bytes()
1684
1685 lconn.send(None)
1686
1687 rconn.close()
1688 lconn.close()
1689
1690 lp.join()
1691 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001692"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001693#
1694#
1695#
1696
1697class _TestHeap(BaseTestCase):
1698
1699 ALLOWED_TYPES = ('processes',)
1700
1701 def test_heap(self):
1702 iterations = 5000
1703 maxblocks = 50
1704 blocks = []
1705
1706 # create and destroy lots of blocks of different sizes
1707 for i in range(iterations):
1708 size = int(random.lognormvariate(0, 1) * 1000)
1709 b = multiprocessing.heap.BufferWrapper(size)
1710 blocks.append(b)
1711 if len(blocks) > maxblocks:
1712 i = random.randrange(maxblocks)
1713 del blocks[i]
1714
1715 # get the heap object
1716 heap = multiprocessing.heap.BufferWrapper._heap
1717
1718 # verify the state of the heap
1719 all = []
1720 occupied = 0
1721 for L in list(heap._len_to_seq.values()):
1722 for arena, start, stop in L:
1723 all.append((heap._arenas.index(arena), start, stop,
1724 stop-start, 'free'))
1725 for arena, start, stop in heap._allocated_blocks:
1726 all.append((heap._arenas.index(arena), start, stop,
1727 stop-start, 'occupied'))
1728 occupied += (stop-start)
1729
1730 all.sort()
1731
1732 for i in range(len(all)-1):
1733 (arena, start, stop) = all[i][:3]
1734 (narena, nstart, nstop) = all[i+1][:3]
1735 self.assertTrue((arena != narena and nstart == 0) or
1736 (stop == nstart))
1737
1738#
1739#
1740#
1741
Benjamin Petersone711caf2008-06-11 16:44:04 +00001742class _Foo(Structure):
1743 _fields_ = [
1744 ('x', c_int),
1745 ('y', c_double)
1746 ]
1747
1748class _TestSharedCTypes(BaseTestCase):
1749
1750 ALLOWED_TYPES = ('processes',)
1751
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001752 def setUp(self):
1753 if not HAS_SHAREDCTYPES:
1754 self.skipTest("requires multiprocessing.sharedctypes")
1755
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001756 @classmethod
1757 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001758 x.value *= 2
1759 y.value *= 2
1760 foo.x *= 2
1761 foo.y *= 2
1762 string.value *= 2
1763 for i in range(len(arr)):
1764 arr[i] *= 2
1765
1766 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001767 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001768 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001769 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001770 arr = self.Array('d', list(range(10)), lock=lock)
1771 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001772 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001773
1774 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1775 p.start()
1776 p.join()
1777
1778 self.assertEqual(x.value, 14)
1779 self.assertAlmostEqual(y.value, 2.0/3.0)
1780 self.assertEqual(foo.x, 6)
1781 self.assertAlmostEqual(foo.y, 4.0)
1782 for i in range(10):
1783 self.assertAlmostEqual(arr[i], i*2)
1784 self.assertEqual(string.value, latin('hellohello'))
1785
1786 def test_synchronize(self):
1787 self.test_sharedctypes(lock=True)
1788
1789 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001790 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001791 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001792 foo.x = 0
1793 foo.y = 0
1794 self.assertEqual(bar.x, 2)
1795 self.assertAlmostEqual(bar.y, 5.0)
1796
1797#
1798#
1799#
1800
1801class _TestFinalize(BaseTestCase):
1802
1803 ALLOWED_TYPES = ('processes',)
1804
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001805 @classmethod
1806 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001807 class Foo(object):
1808 pass
1809
1810 a = Foo()
1811 util.Finalize(a, conn.send, args=('a',))
1812 del a # triggers callback for a
1813
1814 b = Foo()
1815 close_b = util.Finalize(b, conn.send, args=('b',))
1816 close_b() # triggers callback for b
1817 close_b() # does nothing because callback has already been called
1818 del b # does nothing because callback has already been called
1819
1820 c = Foo()
1821 util.Finalize(c, conn.send, args=('c',))
1822
1823 d10 = Foo()
1824 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1825
1826 d01 = Foo()
1827 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1828 d02 = Foo()
1829 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1830 d03 = Foo()
1831 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1832
1833 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1834
1835 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1836
Ezio Melotti13925002011-03-16 11:05:33 +02001837 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001838 # garbage collecting locals
1839 util._exit_function()
1840 conn.close()
1841 os._exit(0)
1842
1843 def test_finalize(self):
1844 conn, child_conn = self.Pipe()
1845
1846 p = self.Process(target=self._test_finalize, args=(child_conn,))
1847 p.start()
1848 p.join()
1849
1850 result = [obj for obj in iter(conn.recv, 'STOP')]
1851 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1852
1853#
1854# Test that from ... import * works for each module
1855#
1856
1857class _TestImportStar(BaseTestCase):
1858
1859 ALLOWED_TYPES = ('processes',)
1860
1861 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001862 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001863 'multiprocessing', 'multiprocessing.connection',
1864 'multiprocessing.heap', 'multiprocessing.managers',
1865 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001866 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001867 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001868 ]
1869
1870 if c_int is not None:
1871 # This module requires _ctypes
1872 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001873
1874 for name in modules:
1875 __import__(name)
1876 mod = sys.modules[name]
1877
1878 for attr in getattr(mod, '__all__', ()):
1879 self.assertTrue(
1880 hasattr(mod, attr),
1881 '%r does not have attribute %r' % (mod, attr)
1882 )
1883
1884#
1885# Quick test that logging works -- does not test logging output
1886#
1887
1888class _TestLogging(BaseTestCase):
1889
1890 ALLOWED_TYPES = ('processes',)
1891
1892 def test_enable_logging(self):
1893 logger = multiprocessing.get_logger()
1894 logger.setLevel(util.SUBWARNING)
1895 self.assertTrue(logger is not None)
1896 logger.debug('this will not be printed')
1897 logger.info('nor will this')
1898 logger.setLevel(LOG_LEVEL)
1899
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001900 @classmethod
1901 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001902 logger = multiprocessing.get_logger()
1903 conn.send(logger.getEffectiveLevel())
1904
1905 def test_level(self):
1906 LEVEL1 = 32
1907 LEVEL2 = 37
1908
1909 logger = multiprocessing.get_logger()
1910 root_logger = logging.getLogger()
1911 root_level = root_logger.level
1912
1913 reader, writer = multiprocessing.Pipe(duplex=False)
1914
1915 logger.setLevel(LEVEL1)
1916 self.Process(target=self._test_level, args=(writer,)).start()
1917 self.assertEqual(LEVEL1, reader.recv())
1918
1919 logger.setLevel(logging.NOTSET)
1920 root_logger.setLevel(LEVEL2)
1921 self.Process(target=self._test_level, args=(writer,)).start()
1922 self.assertEqual(LEVEL2, reader.recv())
1923
1924 root_logger.setLevel(root_level)
1925 logger.setLevel(level=LOG_LEVEL)
1926
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001927
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001928# class _TestLoggingProcessName(BaseTestCase):
1929#
1930# def handle(self, record):
1931# assert record.processName == multiprocessing.current_process().name
1932# self.__handled = True
1933#
1934# def test_logging(self):
1935# handler = logging.Handler()
1936# handler.handle = self.handle
1937# self.__handled = False
1938# # Bypass getLogger() and side-effects
1939# logger = logging.getLoggerClass()(
1940# 'multiprocessing.test.TestLoggingProcessName')
1941# logger.addHandler(handler)
1942# logger.propagate = False
1943#
1944# logger.warn('foo')
1945# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001946
Benjamin Petersone711caf2008-06-11 16:44:04 +00001947#
Jesse Noller6214edd2009-01-19 16:23:53 +00001948# Test to verify handle verification, see issue 3321
1949#
1950
1951class TestInvalidHandle(unittest.TestCase):
1952
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001953 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001954 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02001955 conn = multiprocessing.connection.Connection(44977608)
1956 try:
1957 self.assertRaises((ValueError, IOError), conn.poll)
1958 finally:
1959 # Hack private attribute _handle to avoid printing an error
1960 # in conn.__del__
1961 conn._handle = None
1962 self.assertRaises((ValueError, IOError),
1963 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001964
Jesse Noller6214edd2009-01-19 16:23:53 +00001965#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001966# Functions used to create test cases from the base ones in this module
1967#
1968
1969def get_attributes(Source, names):
1970 d = {}
1971 for name in names:
1972 obj = getattr(Source, name)
1973 if type(obj) == type(get_attributes):
1974 obj = staticmethod(obj)
1975 d[name] = obj
1976 return d
1977
1978def create_test_cases(Mixin, type):
1979 result = {}
1980 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001981 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001982
1983 for name in list(glob.keys()):
1984 if name.startswith('_Test'):
1985 base = glob[name]
1986 if type in base.ALLOWED_TYPES:
1987 newname = 'With' + Type + name[1:]
1988 class Temp(base, unittest.TestCase, Mixin):
1989 pass
1990 result[newname] = Temp
1991 Temp.__name__ = newname
1992 Temp.__module__ = Mixin.__module__
1993 return result
1994
1995#
1996# Create test cases
1997#
1998
1999class ProcessesMixin(object):
2000 TYPE = 'processes'
2001 Process = multiprocessing.Process
2002 locals().update(get_attributes(multiprocessing, (
2003 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2004 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2005 'RawArray', 'current_process', 'active_children', 'Pipe',
2006 'connection', 'JoinableQueue'
2007 )))
2008
2009testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2010globals().update(testcases_processes)
2011
2012
2013class ManagerMixin(object):
2014 TYPE = 'manager'
2015 Process = multiprocessing.Process
2016 manager = object.__new__(multiprocessing.managers.SyncManager)
2017 locals().update(get_attributes(manager, (
2018 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2019 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2020 'Namespace', 'JoinableQueue'
2021 )))
2022
2023testcases_manager = create_test_cases(ManagerMixin, type='manager')
2024globals().update(testcases_manager)
2025
2026
2027class ThreadsMixin(object):
2028 TYPE = 'threads'
2029 Process = multiprocessing.dummy.Process
2030 locals().update(get_attributes(multiprocessing.dummy, (
2031 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2032 'Condition', 'Event', 'Value', 'Array', 'current_process',
2033 'active_children', 'Pipe', 'connection', 'dict', 'list',
2034 'Namespace', 'JoinableQueue'
2035 )))
2036
2037testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2038globals().update(testcases_threads)
2039
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002040class OtherTest(unittest.TestCase):
2041 # TODO: add more tests for deliver/answer challenge.
2042 def test_deliver_challenge_auth_failure(self):
2043 class _FakeConnection(object):
2044 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002045 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002046 def send_bytes(self, data):
2047 pass
2048 self.assertRaises(multiprocessing.AuthenticationError,
2049 multiprocessing.connection.deliver_challenge,
2050 _FakeConnection(), b'abc')
2051
2052 def test_answer_challenge_auth_failure(self):
2053 class _FakeConnection(object):
2054 def __init__(self):
2055 self.count = 0
2056 def recv_bytes(self, size):
2057 self.count += 1
2058 if self.count == 1:
2059 return multiprocessing.connection.CHALLENGE
2060 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002061 return b'something bogus'
2062 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002063 def send_bytes(self, data):
2064 pass
2065 self.assertRaises(multiprocessing.AuthenticationError,
2066 multiprocessing.connection.answer_challenge,
2067 _FakeConnection(), b'abc')
2068
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002069#
2070# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2071#
2072
2073def initializer(ns):
2074 ns.test += 1
2075
2076class TestInitializers(unittest.TestCase):
2077 def setUp(self):
2078 self.mgr = multiprocessing.Manager()
2079 self.ns = self.mgr.Namespace()
2080 self.ns.test = 0
2081
2082 def tearDown(self):
2083 self.mgr.shutdown()
2084
2085 def test_manager_initializer(self):
2086 m = multiprocessing.managers.SyncManager()
2087 self.assertRaises(TypeError, m.start, 1)
2088 m.start(initializer, (self.ns,))
2089 self.assertEqual(self.ns.test, 1)
2090 m.shutdown()
2091
2092 def test_pool_initializer(self):
2093 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2094 p = multiprocessing.Pool(1, initializer, (self.ns,))
2095 p.close()
2096 p.join()
2097 self.assertEqual(self.ns.test, 1)
2098
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002099#
2100# Issue 5155, 5313, 5331: Test process in processes
2101# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2102#
2103
2104def _ThisSubProcess(q):
2105 try:
2106 item = q.get(block=False)
2107 except pyqueue.Empty:
2108 pass
2109
2110def _TestProcess(q):
2111 queue = multiprocessing.Queue()
2112 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2113 subProc.start()
2114 subProc.join()
2115
2116def _afunc(x):
2117 return x*x
2118
2119def pool_in_process():
2120 pool = multiprocessing.Pool(processes=4)
2121 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2122
2123class _file_like(object):
2124 def __init__(self, delegate):
2125 self._delegate = delegate
2126 self._pid = None
2127
2128 @property
2129 def cache(self):
2130 pid = os.getpid()
2131 # There are no race conditions since fork keeps only the running thread
2132 if pid != self._pid:
2133 self._pid = pid
2134 self._cache = []
2135 return self._cache
2136
2137 def write(self, data):
2138 self.cache.append(data)
2139
2140 def flush(self):
2141 self._delegate.write(''.join(self.cache))
2142 self._cache = []
2143
2144class TestStdinBadfiledescriptor(unittest.TestCase):
2145
2146 def test_queue_in_process(self):
2147 queue = multiprocessing.Queue()
2148 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2149 proc.start()
2150 proc.join()
2151
2152 def test_pool_in_process(self):
2153 p = multiprocessing.Process(target=pool_in_process)
2154 p.start()
2155 p.join()
2156
2157 def test_flushing(self):
2158 sio = io.StringIO()
2159 flike = _file_like(sio)
2160 flike.write('foo')
2161 proc = multiprocessing.Process(target=lambda: flike.flush())
2162 flike.flush()
2163 assert sio.getvalue() == 'foo'
2164
2165testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2166 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002167
Benjamin Petersone711caf2008-06-11 16:44:04 +00002168#
2169#
2170#
2171
2172def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002173 if sys.platform.startswith("linux"):
2174 try:
2175 lock = multiprocessing.RLock()
2176 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002177 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002178
Benjamin Petersone711caf2008-06-11 16:44:04 +00002179 if run is None:
2180 from test.support import run_unittest as run
2181
2182 util.get_temp_dir() # creates temp directory for use by all processes
2183
2184 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2185
Benjamin Peterson41181742008-07-02 20:22:54 +00002186 ProcessesMixin.pool = multiprocessing.Pool(4)
2187 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2188 ManagerMixin.manager.__init__()
2189 ManagerMixin.manager.start()
2190 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002191
2192 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002193 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2194 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002195 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2196 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002197 )
2198
2199 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2200 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2201 run(suite)
2202
Benjamin Peterson41181742008-07-02 20:22:54 +00002203 ThreadsMixin.pool.terminate()
2204 ProcessesMixin.pool.terminate()
2205 ManagerMixin.pool.terminate()
2206 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002207
Benjamin Peterson41181742008-07-02 20:22:54 +00002208 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002209
2210def main():
2211 test_main(unittest.TextTestRunner(verbosity=2).run)
2212
2213if __name__ == '__main__':
2214 main()