blob: 7a7805d59f6881433014161b3105a373be0fe8bf [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Antoine Pitroubcb39d42011-08-23 19:46:22 +020038from multiprocessing import util, reduction
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Antoine Pitroubcb39d42011-08-23 19:46:22 +020046try:
47 import msvcrt
48except ImportError:
49 msvcrt = None
50
Benjamin Petersone711caf2008-06-11 16:44:04 +000051#
52#
53#
54
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000055def latin(s):
56 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000057
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59# Constants
60#
61
62LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000063#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
65DELTA = 0.1
66CHECK_TIMINGS = False # making true makes tests take a lot longer
67 # and can sometimes cause some non-serious
68 # failures because some calls block a bit
69 # longer than expected
70if CHECK_TIMINGS:
71 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
72else:
73 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
74
75HAVE_GETVALUE = not getattr(_multiprocessing,
76 'HAVE_BROKEN_SEM_GETVALUE', False)
77
Jesse Noller6214edd2009-01-19 16:23:53 +000078WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020079if WIN32:
80 from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
81
82 def wait_for_handle(handle, timeout):
83 if timeout is None or timeout < 0.0:
84 timeout = INFINITE
85 else:
86 timeout = int(1000 * timeout)
87 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
88else:
89 from select import select
90 _select = util._eintr_retry(select)
91
92 def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
112#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000113# Creates a wrapper for a function which records the time it takes to finish
114#
115
116class TimingWrapper(object):
117
118 def __init__(self, func):
119 self.func = func
120 self.elapsed = None
121
122 def __call__(self, *args, **kwds):
123 t = time.time()
124 try:
125 return self.func(*args, **kwds)
126 finally:
127 self.elapsed = time.time() - t
128
129#
130# Base class for test cases
131#
132
133class BaseTestCase(object):
134
135 ALLOWED_TYPES = ('processes', 'manager', 'threads')
136
137 def assertTimingAlmostEqual(self, a, b):
138 if CHECK_TIMINGS:
139 self.assertAlmostEqual(a, b, 1)
140
141 def assertReturnsIfImplemented(self, value, func, *args):
142 try:
143 res = func(*args)
144 except NotImplementedError:
145 pass
146 else:
147 return self.assertEqual(value, res)
148
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000149 # For the sanity of Windows users, rather than crashing or freezing in
150 # multiple ways.
151 def __reduce__(self, *args):
152 raise NotImplementedError("shouldn't try to pickle a test case")
153
154 __reduce_ex__ = __reduce__
155
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156#
157# Return the value of a semaphore
158#
159
160def get_value(self):
161 try:
162 return self.get_value()
163 except AttributeError:
164 try:
165 return self._Semaphore__value
166 except AttributeError:
167 try:
168 return self._value
169 except AttributeError:
170 raise NotImplementedError
171
172#
173# Testcases
174#
175
176class _TestProcess(BaseTestCase):
177
178 ALLOWED_TYPES = ('processes', 'threads')
179
180 def test_current(self):
181 if self.TYPE == 'threads':
182 return
183
184 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186
187 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000188 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000189 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEqual(current.ident, os.getpid())
192 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000194 def test_daemon_argument(self):
195 if self.TYPE == "threads":
196 return
197
198 # By default uses the current process's daemon flag.
199 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000200 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000201 proc1 = self.Process(target=self._test, daemon=True)
202 self.assertTrue(proc1.daemon)
203 proc2 = self.Process(target=self._test, daemon=False)
204 self.assertFalse(proc2.daemon)
205
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000206 @classmethod
207 def _test(cls, q, *args, **kwds):
208 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209 q.put(args)
210 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000212 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000213 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214 q.put(current.pid)
215
216 def test_process(self):
217 q = self.Queue(1)
218 e = self.Event()
219 args = (q, 1, 2)
220 kwargs = {'hello':23, 'bye':2.54}
221 name = 'SomeProcess'
222 p = self.Process(
223 target=self._test, args=args, kwargs=kwargs, name=name
224 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 current = self.current_process()
227
228 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000229 self.assertEqual(p.authkey, current.authkey)
230 self.assertEqual(p.is_alive(), False)
231 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000232 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000233 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235
236 p.start()
237
Ezio Melottib3aedd42010-11-20 19:04:17 +0000238 self.assertEqual(p.exitcode, None)
239 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000240 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
Ezio Melottib3aedd42010-11-20 19:04:17 +0000242 self.assertEqual(q.get(), args[1:])
243 self.assertEqual(q.get(), kwargs)
244 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(q.get(), current.authkey)
247 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248
249 p.join()
250
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(p.exitcode, 0)
252 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000255 @classmethod
256 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257 time.sleep(1000)
258
259 def test_terminate(self):
260 if self.TYPE == 'threads':
261 return
262
263 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000264 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265 p.start()
266
267 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000269 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
271 p.terminate()
272
273 join = TimingWrapper(p.join)
274 self.assertEqual(join(), None)
275 self.assertTimingAlmostEqual(join.elapsed, 0.0)
276
277 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000278 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279
280 p.join()
281
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000282 # XXX sometimes get p.exitcode == 0 on Windows ...
283 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000284
285 def test_cpu_count(self):
286 try:
287 cpus = multiprocessing.cpu_count()
288 except NotImplementedError:
289 cpus = 1
290 self.assertTrue(type(cpus) is int)
291 self.assertTrue(cpus >= 1)
292
293 def test_active_children(self):
294 self.assertEqual(type(self.active_children()), list)
295
296 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000297 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298
299 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000300 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301
302 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000305 @classmethod
306 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307 from multiprocessing import forking
308 wconn.send(id)
309 if len(id) < 2:
310 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000311 p = cls.Process(
312 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000313 )
314 p.start()
315 p.join()
316
317 def test_recursion(self):
318 rconn, wconn = self.Pipe(duplex=False)
319 self._test_recursion(wconn, [])
320
321 time.sleep(DELTA)
322 result = []
323 while rconn.poll():
324 result.append(rconn.recv())
325
326 expected = [
327 [],
328 [0],
329 [0, 0],
330 [0, 1],
331 [1],
332 [1, 0],
333 [1, 1]
334 ]
335 self.assertEqual(result, expected)
336
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200337 @classmethod
338 def _test_sentinel(cls, event):
339 event.wait(10.0)
340
341 def test_sentinel(self):
342 if self.TYPE == "threads":
343 return
344 event = self.Event()
345 p = self.Process(target=self._test_sentinel, args=(event,))
346 with self.assertRaises(ValueError):
347 p.sentinel
348 p.start()
349 self.addCleanup(p.join)
350 sentinel = p.sentinel
351 self.assertIsInstance(sentinel, int)
352 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
353 event.set()
354 p.join()
355 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
356
Benjamin Petersone711caf2008-06-11 16:44:04 +0000357#
358#
359#
360
361class _UpperCaser(multiprocessing.Process):
362
363 def __init__(self):
364 multiprocessing.Process.__init__(self)
365 self.child_conn, self.parent_conn = multiprocessing.Pipe()
366
367 def run(self):
368 self.parent_conn.close()
369 for s in iter(self.child_conn.recv, None):
370 self.child_conn.send(s.upper())
371 self.child_conn.close()
372
373 def submit(self, s):
374 assert type(s) is str
375 self.parent_conn.send(s)
376 return self.parent_conn.recv()
377
378 def stop(self):
379 self.parent_conn.send(None)
380 self.parent_conn.close()
381 self.child_conn.close()
382
383class _TestSubclassingProcess(BaseTestCase):
384
385 ALLOWED_TYPES = ('processes',)
386
387 def test_subclassing(self):
388 uppercaser = _UpperCaser()
389 uppercaser.start()
390 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
391 self.assertEqual(uppercaser.submit('world'), 'WORLD')
392 uppercaser.stop()
393 uppercaser.join()
394
395#
396#
397#
398
399def queue_empty(q):
400 if hasattr(q, 'empty'):
401 return q.empty()
402 else:
403 return q.qsize() == 0
404
405def queue_full(q, maxsize):
406 if hasattr(q, 'full'):
407 return q.full()
408 else:
409 return q.qsize() == maxsize
410
411
412class _TestQueue(BaseTestCase):
413
414
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000415 @classmethod
416 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000417 child_can_start.wait()
418 for i in range(6):
419 queue.get()
420 parent_can_continue.set()
421
422 def test_put(self):
423 MAXSIZE = 6
424 queue = self.Queue(maxsize=MAXSIZE)
425 child_can_start = self.Event()
426 parent_can_continue = self.Event()
427
428 proc = self.Process(
429 target=self._test_put,
430 args=(queue, child_can_start, parent_can_continue)
431 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000432 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000433 proc.start()
434
435 self.assertEqual(queue_empty(queue), True)
436 self.assertEqual(queue_full(queue, MAXSIZE), False)
437
438 queue.put(1)
439 queue.put(2, True)
440 queue.put(3, True, None)
441 queue.put(4, False)
442 queue.put(5, False, None)
443 queue.put_nowait(6)
444
445 # the values may be in buffer but not yet in pipe so sleep a bit
446 time.sleep(DELTA)
447
448 self.assertEqual(queue_empty(queue), False)
449 self.assertEqual(queue_full(queue, MAXSIZE), True)
450
451 put = TimingWrapper(queue.put)
452 put_nowait = TimingWrapper(queue.put_nowait)
453
454 self.assertRaises(pyqueue.Full, put, 7, False)
455 self.assertTimingAlmostEqual(put.elapsed, 0)
456
457 self.assertRaises(pyqueue.Full, put, 7, False, None)
458 self.assertTimingAlmostEqual(put.elapsed, 0)
459
460 self.assertRaises(pyqueue.Full, put_nowait, 7)
461 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
462
463 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
464 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
465
466 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
467 self.assertTimingAlmostEqual(put.elapsed, 0)
468
469 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
470 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
471
472 child_can_start.set()
473 parent_can_continue.wait()
474
475 self.assertEqual(queue_empty(queue), True)
476 self.assertEqual(queue_full(queue, MAXSIZE), False)
477
478 proc.join()
479
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000480 @classmethod
481 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000483 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000484 queue.put(2)
485 queue.put(3)
486 queue.put(4)
487 queue.put(5)
488 parent_can_continue.set()
489
490 def test_get(self):
491 queue = self.Queue()
492 child_can_start = self.Event()
493 parent_can_continue = self.Event()
494
495 proc = self.Process(
496 target=self._test_get,
497 args=(queue, child_can_start, parent_can_continue)
498 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000499 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000500 proc.start()
501
502 self.assertEqual(queue_empty(queue), True)
503
504 child_can_start.set()
505 parent_can_continue.wait()
506
507 time.sleep(DELTA)
508 self.assertEqual(queue_empty(queue), False)
509
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000510 # Hangs unexpectedly, remove for now
511 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000512 self.assertEqual(queue.get(True, None), 2)
513 self.assertEqual(queue.get(True), 3)
514 self.assertEqual(queue.get(timeout=1), 4)
515 self.assertEqual(queue.get_nowait(), 5)
516
517 self.assertEqual(queue_empty(queue), True)
518
519 get = TimingWrapper(queue.get)
520 get_nowait = TimingWrapper(queue.get_nowait)
521
522 self.assertRaises(pyqueue.Empty, get, False)
523 self.assertTimingAlmostEqual(get.elapsed, 0)
524
525 self.assertRaises(pyqueue.Empty, get, False, None)
526 self.assertTimingAlmostEqual(get.elapsed, 0)
527
528 self.assertRaises(pyqueue.Empty, get_nowait)
529 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
530
531 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
532 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
533
534 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
535 self.assertTimingAlmostEqual(get.elapsed, 0)
536
537 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
538 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
539
540 proc.join()
541
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000542 @classmethod
543 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000544 for i in range(10, 20):
545 queue.put(i)
546 # note that at this point the items may only be buffered, so the
547 # process cannot shutdown until the feeder thread has finished
548 # pushing items onto the pipe.
549
550 def test_fork(self):
551 # Old versions of Queue would fail to create a new feeder
552 # thread for a forked process if the original process had its
553 # own feeder thread. This test checks that this no longer
554 # happens.
555
556 queue = self.Queue()
557
558 # put items on queue so that main process starts a feeder thread
559 for i in range(10):
560 queue.put(i)
561
562 # wait to make sure thread starts before we fork a new process
563 time.sleep(DELTA)
564
565 # fork process
566 p = self.Process(target=self._test_fork, args=(queue,))
567 p.start()
568
569 # check that all expected items are in the queue
570 for i in range(20):
571 self.assertEqual(queue.get(), i)
572 self.assertRaises(pyqueue.Empty, queue.get, False)
573
574 p.join()
575
576 def test_qsize(self):
577 q = self.Queue()
578 try:
579 self.assertEqual(q.qsize(), 0)
580 except NotImplementedError:
581 return
582 q.put(1)
583 self.assertEqual(q.qsize(), 1)
584 q.put(5)
585 self.assertEqual(q.qsize(), 2)
586 q.get()
587 self.assertEqual(q.qsize(), 1)
588 q.get()
589 self.assertEqual(q.qsize(), 0)
590
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000591 @classmethod
592 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000593 for obj in iter(q.get, None):
594 time.sleep(DELTA)
595 q.task_done()
596
597 def test_task_done(self):
598 queue = self.JoinableQueue()
599
600 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000601 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000602
603 workers = [self.Process(target=self._test_task_done, args=(queue,))
604 for i in range(4)]
605
606 for p in workers:
607 p.start()
608
609 for i in range(10):
610 queue.put(i)
611
612 queue.join()
613
614 for p in workers:
615 queue.put(None)
616
617 for p in workers:
618 p.join()
619
620#
621#
622#
623
624class _TestLock(BaseTestCase):
625
626 def test_lock(self):
627 lock = self.Lock()
628 self.assertEqual(lock.acquire(), True)
629 self.assertEqual(lock.acquire(False), False)
630 self.assertEqual(lock.release(), None)
631 self.assertRaises((ValueError, threading.ThreadError), lock.release)
632
633 def test_rlock(self):
634 lock = self.RLock()
635 self.assertEqual(lock.acquire(), True)
636 self.assertEqual(lock.acquire(), True)
637 self.assertEqual(lock.acquire(), True)
638 self.assertEqual(lock.release(), None)
639 self.assertEqual(lock.release(), None)
640 self.assertEqual(lock.release(), None)
641 self.assertRaises((AssertionError, RuntimeError), lock.release)
642
Jesse Nollerf8d00852009-03-31 03:25:07 +0000643 def test_lock_context(self):
644 with self.Lock():
645 pass
646
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647
648class _TestSemaphore(BaseTestCase):
649
650 def _test_semaphore(self, sem):
651 self.assertReturnsIfImplemented(2, get_value, sem)
652 self.assertEqual(sem.acquire(), True)
653 self.assertReturnsIfImplemented(1, get_value, sem)
654 self.assertEqual(sem.acquire(), True)
655 self.assertReturnsIfImplemented(0, get_value, sem)
656 self.assertEqual(sem.acquire(False), False)
657 self.assertReturnsIfImplemented(0, get_value, sem)
658 self.assertEqual(sem.release(), None)
659 self.assertReturnsIfImplemented(1, get_value, sem)
660 self.assertEqual(sem.release(), None)
661 self.assertReturnsIfImplemented(2, get_value, sem)
662
663 def test_semaphore(self):
664 sem = self.Semaphore(2)
665 self._test_semaphore(sem)
666 self.assertEqual(sem.release(), None)
667 self.assertReturnsIfImplemented(3, get_value, sem)
668 self.assertEqual(sem.release(), None)
669 self.assertReturnsIfImplemented(4, get_value, sem)
670
671 def test_bounded_semaphore(self):
672 sem = self.BoundedSemaphore(2)
673 self._test_semaphore(sem)
674 # Currently fails on OS/X
675 #if HAVE_GETVALUE:
676 # self.assertRaises(ValueError, sem.release)
677 # self.assertReturnsIfImplemented(2, get_value, sem)
678
679 def test_timeout(self):
680 if self.TYPE != 'processes':
681 return
682
683 sem = self.Semaphore(0)
684 acquire = TimingWrapper(sem.acquire)
685
686 self.assertEqual(acquire(False), False)
687 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
688
689 self.assertEqual(acquire(False, None), False)
690 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
691
692 self.assertEqual(acquire(False, TIMEOUT1), False)
693 self.assertTimingAlmostEqual(acquire.elapsed, 0)
694
695 self.assertEqual(acquire(True, TIMEOUT2), False)
696 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
697
698 self.assertEqual(acquire(timeout=TIMEOUT3), False)
699 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
700
701
702class _TestCondition(BaseTestCase):
703
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000704 @classmethod
705 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 cond.acquire()
707 sleeping.release()
708 cond.wait(timeout)
709 woken.release()
710 cond.release()
711
712 def check_invariant(self, cond):
713 # this is only supposed to succeed when there are no sleepers
714 if self.TYPE == 'processes':
715 try:
716 sleepers = (cond._sleeping_count.get_value() -
717 cond._woken_count.get_value())
718 self.assertEqual(sleepers, 0)
719 self.assertEqual(cond._wait_semaphore.get_value(), 0)
720 except NotImplementedError:
721 pass
722
723 def test_notify(self):
724 cond = self.Condition()
725 sleeping = self.Semaphore(0)
726 woken = self.Semaphore(0)
727
728 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000729 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000730 p.start()
731
732 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000733 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000734 p.start()
735
736 # wait for both children to start sleeping
737 sleeping.acquire()
738 sleeping.acquire()
739
740 # check no process/thread has woken up
741 time.sleep(DELTA)
742 self.assertReturnsIfImplemented(0, get_value, woken)
743
744 # wake up one process/thread
745 cond.acquire()
746 cond.notify()
747 cond.release()
748
749 # check one process/thread has woken up
750 time.sleep(DELTA)
751 self.assertReturnsIfImplemented(1, get_value, woken)
752
753 # wake up another
754 cond.acquire()
755 cond.notify()
756 cond.release()
757
758 # check other has woken up
759 time.sleep(DELTA)
760 self.assertReturnsIfImplemented(2, get_value, woken)
761
762 # check state is not mucked up
763 self.check_invariant(cond)
764 p.join()
765
766 def test_notify_all(self):
767 cond = self.Condition()
768 sleeping = self.Semaphore(0)
769 woken = self.Semaphore(0)
770
771 # start some threads/processes which will timeout
772 for i in range(3):
773 p = self.Process(target=self.f,
774 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000775 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 p.start()
777
778 t = threading.Thread(target=self.f,
779 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000780 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000781 t.start()
782
783 # wait for them all to sleep
784 for i in range(6):
785 sleeping.acquire()
786
787 # check they have all timed out
788 for i in range(6):
789 woken.acquire()
790 self.assertReturnsIfImplemented(0, get_value, woken)
791
792 # check state is not mucked up
793 self.check_invariant(cond)
794
795 # start some more threads/processes
796 for i in range(3):
797 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000798 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 p.start()
800
801 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000802 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 t.start()
804
805 # wait for them to all sleep
806 for i in range(6):
807 sleeping.acquire()
808
809 # check no process/thread has woken up
810 time.sleep(DELTA)
811 self.assertReturnsIfImplemented(0, get_value, woken)
812
813 # wake them all up
814 cond.acquire()
815 cond.notify_all()
816 cond.release()
817
818 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200819 for i in range(10):
820 try:
821 if get_value(woken) == 6:
822 break
823 except NotImplementedError:
824 break
825 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 self.assertReturnsIfImplemented(6, get_value, woken)
827
828 # check state is not mucked up
829 self.check_invariant(cond)
830
831 def test_timeout(self):
832 cond = self.Condition()
833 wait = TimingWrapper(cond.wait)
834 cond.acquire()
835 res = wait(TIMEOUT1)
836 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000837 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
839
840
841class _TestEvent(BaseTestCase):
842
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000843 @classmethod
844 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845 time.sleep(TIMEOUT2)
846 event.set()
847
848 def test_event(self):
849 event = self.Event()
850 wait = TimingWrapper(event.wait)
851
Ezio Melotti13925002011-03-16 11:05:33 +0200852 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000854 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855
Benjamin Peterson965ce872009-04-05 21:24:58 +0000856 # Removed, threading.Event.wait() will return the value of the __flag
857 # instead of None. API Shear with the semaphore backed mp.Event
858 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000860 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
862
863 event.set()
864
865 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000866 self.assertEqual(event.is_set(), True)
867 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000869 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
871 # self.assertEqual(event.is_set(), True)
872
873 event.clear()
874
875 #self.assertEqual(event.is_set(), False)
876
877 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000878 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000879
880#
881#
882#
883
884class _TestValue(BaseTestCase):
885
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000886 ALLOWED_TYPES = ('processes',)
887
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 codes_values = [
889 ('i', 4343, 24234),
890 ('d', 3.625, -4.25),
891 ('h', -232, 234),
892 ('c', latin('x'), latin('y'))
893 ]
894
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000895 def setUp(self):
896 if not HAS_SHAREDCTYPES:
897 self.skipTest("requires multiprocessing.sharedctypes")
898
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000899 @classmethod
900 def _test(cls, values):
901 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902 sv.value = cv[2]
903
904
905 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906 if raw:
907 values = [self.RawValue(code, value)
908 for code, value, _ in self.codes_values]
909 else:
910 values = [self.Value(code, value)
911 for code, value, _ in self.codes_values]
912
913 for sv, cv in zip(values, self.codes_values):
914 self.assertEqual(sv.value, cv[1])
915
916 proc = self.Process(target=self._test, args=(values,))
917 proc.start()
918 proc.join()
919
920 for sv, cv in zip(values, self.codes_values):
921 self.assertEqual(sv.value, cv[2])
922
923 def test_rawvalue(self):
924 self.test_value(raw=True)
925
926 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927 val1 = self.Value('i', 5)
928 lock1 = val1.get_lock()
929 obj1 = val1.get_obj()
930
931 val2 = self.Value('i', 5, lock=None)
932 lock2 = val2.get_lock()
933 obj2 = val2.get_obj()
934
935 lock = self.Lock()
936 val3 = self.Value('i', 5, lock=lock)
937 lock3 = val3.get_lock()
938 obj3 = val3.get_obj()
939 self.assertEqual(lock, lock3)
940
Jesse Nollerb0516a62009-01-18 03:11:38 +0000941 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000942 self.assertFalse(hasattr(arr4, 'get_lock'))
943 self.assertFalse(hasattr(arr4, 'get_obj'))
944
Jesse Nollerb0516a62009-01-18 03:11:38 +0000945 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
946
947 arr5 = self.RawValue('i', 5)
948 self.assertFalse(hasattr(arr5, 'get_lock'))
949 self.assertFalse(hasattr(arr5, 'get_obj'))
950
Benjamin Petersone711caf2008-06-11 16:44:04 +0000951
952class _TestArray(BaseTestCase):
953
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000954 ALLOWED_TYPES = ('processes',)
955
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000956 @classmethod
957 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000958 for i in range(1, len(seq)):
959 seq[i] += seq[i-1]
960
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000961 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000962 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000963 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
964 if raw:
965 arr = self.RawArray('i', seq)
966 else:
967 arr = self.Array('i', seq)
968
969 self.assertEqual(len(arr), len(seq))
970 self.assertEqual(arr[3], seq[3])
971 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
972
973 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
974
975 self.assertEqual(list(arr[:]), seq)
976
977 self.f(seq)
978
979 p = self.Process(target=self.f, args=(arr,))
980 p.start()
981 p.join()
982
983 self.assertEqual(list(arr[:]), seq)
984
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000985 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000986 def test_array_from_size(self):
987 size = 10
988 # Test for zeroing (see issue #11675).
989 # The repetition below strengthens the test by increasing the chances
990 # of previously allocated non-zero memory being used for the new array
991 # on the 2nd and 3rd loops.
992 for _ in range(3):
993 arr = self.Array('i', size)
994 self.assertEqual(len(arr), size)
995 self.assertEqual(list(arr), [0] * size)
996 arr[:] = range(10)
997 self.assertEqual(list(arr), list(range(10)))
998 del arr
999
1000 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001001 def test_rawarray(self):
1002 self.test_array(raw=True)
1003
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001004 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001005 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001006 arr1 = self.Array('i', list(range(10)))
1007 lock1 = arr1.get_lock()
1008 obj1 = arr1.get_obj()
1009
1010 arr2 = self.Array('i', list(range(10)), lock=None)
1011 lock2 = arr2.get_lock()
1012 obj2 = arr2.get_obj()
1013
1014 lock = self.Lock()
1015 arr3 = self.Array('i', list(range(10)), lock=lock)
1016 lock3 = arr3.get_lock()
1017 obj3 = arr3.get_obj()
1018 self.assertEqual(lock, lock3)
1019
Jesse Nollerb0516a62009-01-18 03:11:38 +00001020 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001021 self.assertFalse(hasattr(arr4, 'get_lock'))
1022 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001023 self.assertRaises(AttributeError,
1024 self.Array, 'i', range(10), lock='notalock')
1025
1026 arr5 = self.RawArray('i', range(10))
1027 self.assertFalse(hasattr(arr5, 'get_lock'))
1028 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001029
1030#
1031#
1032#
1033
1034class _TestContainers(BaseTestCase):
1035
1036 ALLOWED_TYPES = ('manager',)
1037
1038 def test_list(self):
1039 a = self.list(list(range(10)))
1040 self.assertEqual(a[:], list(range(10)))
1041
1042 b = self.list()
1043 self.assertEqual(b[:], [])
1044
1045 b.extend(list(range(5)))
1046 self.assertEqual(b[:], list(range(5)))
1047
1048 self.assertEqual(b[2], 2)
1049 self.assertEqual(b[2:10], [2,3,4])
1050
1051 b *= 2
1052 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1053
1054 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1055
1056 self.assertEqual(a[:], list(range(10)))
1057
1058 d = [a, b]
1059 e = self.list(d)
1060 self.assertEqual(
1061 e[:],
1062 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1063 )
1064
1065 f = self.list([a])
1066 a.append('hello')
1067 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1068
1069 def test_dict(self):
1070 d = self.dict()
1071 indices = list(range(65, 70))
1072 for i in indices:
1073 d[i] = chr(i)
1074 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1075 self.assertEqual(sorted(d.keys()), indices)
1076 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1077 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1078
1079 def test_namespace(self):
1080 n = self.Namespace()
1081 n.name = 'Bob'
1082 n.job = 'Builder'
1083 n._hidden = 'hidden'
1084 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1085 del n.job
1086 self.assertEqual(str(n), "Namespace(name='Bob')")
1087 self.assertTrue(hasattr(n, 'name'))
1088 self.assertTrue(not hasattr(n, 'job'))
1089
1090#
1091#
1092#
1093
1094def sqr(x, wait=0.0):
1095 time.sleep(wait)
1096 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001097
Benjamin Petersone711caf2008-06-11 16:44:04 +00001098class _TestPool(BaseTestCase):
1099
1100 def test_apply(self):
1101 papply = self.pool.apply
1102 self.assertEqual(papply(sqr, (5,)), sqr(5))
1103 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1104
1105 def test_map(self):
1106 pmap = self.pool.map
1107 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1108 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1109 list(map(sqr, list(range(100)))))
1110
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001111 def test_map_chunksize(self):
1112 try:
1113 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1114 except multiprocessing.TimeoutError:
1115 self.fail("pool.map_async with chunksize stalled on null list")
1116
Benjamin Petersone711caf2008-06-11 16:44:04 +00001117 def test_async(self):
1118 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1119 get = TimingWrapper(res.get)
1120 self.assertEqual(get(), 49)
1121 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1122
1123 def test_async_timeout(self):
1124 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1125 get = TimingWrapper(res.get)
1126 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1127 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1128
1129 def test_imap(self):
1130 it = self.pool.imap(sqr, list(range(10)))
1131 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1132
1133 it = self.pool.imap(sqr, list(range(10)))
1134 for i in range(10):
1135 self.assertEqual(next(it), i*i)
1136 self.assertRaises(StopIteration, it.__next__)
1137
1138 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1139 for i in range(1000):
1140 self.assertEqual(next(it), i*i)
1141 self.assertRaises(StopIteration, it.__next__)
1142
1143 def test_imap_unordered(self):
1144 it = self.pool.imap_unordered(sqr, list(range(1000)))
1145 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1146
1147 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1148 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1149
1150 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001151 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1152 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1153
Benjamin Petersone711caf2008-06-11 16:44:04 +00001154 p = multiprocessing.Pool(3)
1155 self.assertEqual(3, len(p._pool))
1156 p.close()
1157 p.join()
1158
1159 def test_terminate(self):
1160 if self.TYPE == 'manager':
1161 # On Unix a forked process increfs each shared object to
1162 # which its parent process held a reference. If the
1163 # forked process gets terminated then there is likely to
1164 # be a reference leak. So to prevent
1165 # _TestZZZNumberOfObjects from failing we skip this test
1166 # when using a manager.
1167 return
1168
1169 result = self.pool.map_async(
1170 time.sleep, [0.1 for i in range(10000)], chunksize=1
1171 )
1172 self.pool.terminate()
1173 join = TimingWrapper(self.pool.join)
1174 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001175 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001176
Ask Solem2afcbf22010-11-09 20:55:52 +00001177def raising():
1178 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001179
Ask Solem2afcbf22010-11-09 20:55:52 +00001180def unpickleable_result():
1181 return lambda: 42
1182
1183class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001184 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001185
1186 def test_async_error_callback(self):
1187 p = multiprocessing.Pool(2)
1188
1189 scratchpad = [None]
1190 def errback(exc):
1191 scratchpad[0] = exc
1192
1193 res = p.apply_async(raising, error_callback=errback)
1194 self.assertRaises(KeyError, res.get)
1195 self.assertTrue(scratchpad[0])
1196 self.assertIsInstance(scratchpad[0], KeyError)
1197
1198 p.close()
1199 p.join()
1200
1201 def test_unpickleable_result(self):
1202 from multiprocessing.pool import MaybeEncodingError
1203 p = multiprocessing.Pool(2)
1204
1205 # Make sure we don't lose pool processes because of encoding errors.
1206 for iteration in range(20):
1207
1208 scratchpad = [None]
1209 def errback(exc):
1210 scratchpad[0] = exc
1211
1212 res = p.apply_async(unpickleable_result, error_callback=errback)
1213 self.assertRaises(MaybeEncodingError, res.get)
1214 wrapped = scratchpad[0]
1215 self.assertTrue(wrapped)
1216 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1217 self.assertIsNotNone(wrapped.exc)
1218 self.assertIsNotNone(wrapped.value)
1219
1220 p.close()
1221 p.join()
1222
1223class _TestPoolWorkerLifetime(BaseTestCase):
1224 ALLOWED_TYPES = ('processes', )
1225
Jesse Noller1f0b6582010-01-27 03:36:01 +00001226 def test_pool_worker_lifetime(self):
1227 p = multiprocessing.Pool(3, maxtasksperchild=10)
1228 self.assertEqual(3, len(p._pool))
1229 origworkerpids = [w.pid for w in p._pool]
1230 # Run many tasks so each worker gets replaced (hopefully)
1231 results = []
1232 for i in range(100):
1233 results.append(p.apply_async(sqr, (i, )))
1234 # Fetch the results and verify we got the right answers,
1235 # also ensuring all the tasks have completed.
1236 for (j, res) in enumerate(results):
1237 self.assertEqual(res.get(), sqr(j))
1238 # Refill the pool
1239 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001240 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001241 # (countdown * DELTA = 5 seconds max startup process time)
1242 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001243 while countdown and not all(w.is_alive() for w in p._pool):
1244 countdown -= 1
1245 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001246 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001247 # All pids should be assigned. See issue #7805.
1248 self.assertNotIn(None, origworkerpids)
1249 self.assertNotIn(None, finalworkerpids)
1250 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001251 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1252 p.close()
1253 p.join()
1254
Benjamin Petersone711caf2008-06-11 16:44:04 +00001255#
1256# Test that manager has expected number of shared objects left
1257#
1258
1259class _TestZZZNumberOfObjects(BaseTestCase):
1260 # Because test cases are sorted alphabetically, this one will get
1261 # run after all the other tests for the manager. It tests that
1262 # there have been no "reference leaks" for the manager's shared
1263 # objects. Note the comment in _TestPool.test_terminate().
1264 ALLOWED_TYPES = ('manager',)
1265
1266 def test_number_of_objects(self):
1267 EXPECTED_NUMBER = 1 # the pool object is still alive
1268 multiprocessing.active_children() # discard dead process objs
1269 gc.collect() # do garbage collection
1270 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001271 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001272 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001273 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001274 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001275
1276 self.assertEqual(refs, EXPECTED_NUMBER)
1277
1278#
1279# Test of creating a customized manager class
1280#
1281
1282from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1283
1284class FooBar(object):
1285 def f(self):
1286 return 'f()'
1287 def g(self):
1288 raise ValueError
1289 def _h(self):
1290 return '_h()'
1291
1292def baz():
1293 for i in range(10):
1294 yield i*i
1295
1296class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001297 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001298 def __iter__(self):
1299 return self
1300 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001301 return self._callmethod('__next__')
1302
1303class MyManager(BaseManager):
1304 pass
1305
1306MyManager.register('Foo', callable=FooBar)
1307MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1308MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1309
1310
1311class _TestMyManager(BaseTestCase):
1312
1313 ALLOWED_TYPES = ('manager',)
1314
1315 def test_mymanager(self):
1316 manager = MyManager()
1317 manager.start()
1318
1319 foo = manager.Foo()
1320 bar = manager.Bar()
1321 baz = manager.baz()
1322
1323 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1324 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1325
1326 self.assertEqual(foo_methods, ['f', 'g'])
1327 self.assertEqual(bar_methods, ['f', '_h'])
1328
1329 self.assertEqual(foo.f(), 'f()')
1330 self.assertRaises(ValueError, foo.g)
1331 self.assertEqual(foo._callmethod('f'), 'f()')
1332 self.assertRaises(RemoteError, foo._callmethod, '_h')
1333
1334 self.assertEqual(bar.f(), 'f()')
1335 self.assertEqual(bar._h(), '_h()')
1336 self.assertEqual(bar._callmethod('f'), 'f()')
1337 self.assertEqual(bar._callmethod('_h'), '_h()')
1338
1339 self.assertEqual(list(baz), [i*i for i in range(10)])
1340
1341 manager.shutdown()
1342
1343#
1344# Test of connecting to a remote server and using xmlrpclib for serialization
1345#
1346
1347_queue = pyqueue.Queue()
1348def get_queue():
1349 return _queue
1350
1351class QueueManager(BaseManager):
1352 '''manager class used by server process'''
1353QueueManager.register('get_queue', callable=get_queue)
1354
1355class QueueManager2(BaseManager):
1356 '''manager class which specifies the same interface as QueueManager'''
1357QueueManager2.register('get_queue')
1358
1359
1360SERIALIZER = 'xmlrpclib'
1361
1362class _TestRemoteManager(BaseTestCase):
1363
1364 ALLOWED_TYPES = ('manager',)
1365
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001366 @classmethod
1367 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001368 manager = QueueManager2(
1369 address=address, authkey=authkey, serializer=SERIALIZER
1370 )
1371 manager.connect()
1372 queue = manager.get_queue()
1373 queue.put(('hello world', None, True, 2.25))
1374
1375 def test_remote(self):
1376 authkey = os.urandom(32)
1377
1378 manager = QueueManager(
1379 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1380 )
1381 manager.start()
1382
1383 p = self.Process(target=self._putter, args=(manager.address, authkey))
1384 p.start()
1385
1386 manager2 = QueueManager2(
1387 address=manager.address, authkey=authkey, serializer=SERIALIZER
1388 )
1389 manager2.connect()
1390 queue = manager2.get_queue()
1391
1392 # Note that xmlrpclib will deserialize object as a list not a tuple
1393 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1394
1395 # Because we are using xmlrpclib for serialization instead of
1396 # pickle this will cause a serialization error.
1397 self.assertRaises(Exception, queue.put, time.sleep)
1398
1399 # Make queue finalizer run before the server is stopped
1400 del queue
1401 manager.shutdown()
1402
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001403class _TestManagerRestart(BaseTestCase):
1404
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001405 @classmethod
1406 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001407 manager = QueueManager(
1408 address=address, authkey=authkey, serializer=SERIALIZER)
1409 manager.connect()
1410 queue = manager.get_queue()
1411 queue.put('hello world')
1412
1413 def test_rapid_restart(self):
1414 authkey = os.urandom(32)
1415 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001416 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001417 srvr = manager.get_server()
1418 addr = srvr.address
1419 # Close the connection.Listener socket which gets opened as a part
1420 # of manager.get_server(). It's not needed for the test.
1421 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001422 manager.start()
1423
1424 p = self.Process(target=self._putter, args=(manager.address, authkey))
1425 p.start()
1426 queue = manager.get_queue()
1427 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001428 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001429 manager.shutdown()
1430 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001431 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001432 try:
1433 manager.start()
1434 except IOError as e:
1435 if e.errno != errno.EADDRINUSE:
1436 raise
1437 # Retry after some time, in case the old socket was lingering
1438 # (sporadic failure on buildbots)
1439 time.sleep(1.0)
1440 manager = QueueManager(
1441 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001442 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001443
Benjamin Petersone711caf2008-06-11 16:44:04 +00001444#
1445#
1446#
1447
1448SENTINEL = latin('')
1449
1450class _TestConnection(BaseTestCase):
1451
1452 ALLOWED_TYPES = ('processes', 'threads')
1453
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001454 @classmethod
1455 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001456 for msg in iter(conn.recv_bytes, SENTINEL):
1457 conn.send_bytes(msg)
1458 conn.close()
1459
1460 def test_connection(self):
1461 conn, child_conn = self.Pipe()
1462
1463 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001464 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001465 p.start()
1466
1467 seq = [1, 2.25, None]
1468 msg = latin('hello world')
1469 longmsg = msg * 10
1470 arr = array.array('i', list(range(4)))
1471
1472 if self.TYPE == 'processes':
1473 self.assertEqual(type(conn.fileno()), int)
1474
1475 self.assertEqual(conn.send(seq), None)
1476 self.assertEqual(conn.recv(), seq)
1477
1478 self.assertEqual(conn.send_bytes(msg), None)
1479 self.assertEqual(conn.recv_bytes(), msg)
1480
1481 if self.TYPE == 'processes':
1482 buffer = array.array('i', [0]*10)
1483 expected = list(arr) + [0] * (10 - len(arr))
1484 self.assertEqual(conn.send_bytes(arr), None)
1485 self.assertEqual(conn.recv_bytes_into(buffer),
1486 len(arr) * buffer.itemsize)
1487 self.assertEqual(list(buffer), expected)
1488
1489 buffer = array.array('i', [0]*10)
1490 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1491 self.assertEqual(conn.send_bytes(arr), None)
1492 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1493 len(arr) * buffer.itemsize)
1494 self.assertEqual(list(buffer), expected)
1495
1496 buffer = bytearray(latin(' ' * 40))
1497 self.assertEqual(conn.send_bytes(longmsg), None)
1498 try:
1499 res = conn.recv_bytes_into(buffer)
1500 except multiprocessing.BufferTooShort as e:
1501 self.assertEqual(e.args, (longmsg,))
1502 else:
1503 self.fail('expected BufferTooShort, got %s' % res)
1504
1505 poll = TimingWrapper(conn.poll)
1506
1507 self.assertEqual(poll(), False)
1508 self.assertTimingAlmostEqual(poll.elapsed, 0)
1509
1510 self.assertEqual(poll(TIMEOUT1), False)
1511 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1512
1513 conn.send(None)
1514
1515 self.assertEqual(poll(TIMEOUT1), True)
1516 self.assertTimingAlmostEqual(poll.elapsed, 0)
1517
1518 self.assertEqual(conn.recv(), None)
1519
1520 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1521 conn.send_bytes(really_big_msg)
1522 self.assertEqual(conn.recv_bytes(), really_big_msg)
1523
1524 conn.send_bytes(SENTINEL) # tell child to quit
1525 child_conn.close()
1526
1527 if self.TYPE == 'processes':
1528 self.assertEqual(conn.readable, True)
1529 self.assertEqual(conn.writable, True)
1530 self.assertRaises(EOFError, conn.recv)
1531 self.assertRaises(EOFError, conn.recv_bytes)
1532
1533 p.join()
1534
1535 def test_duplex_false(self):
1536 reader, writer = self.Pipe(duplex=False)
1537 self.assertEqual(writer.send(1), None)
1538 self.assertEqual(reader.recv(), 1)
1539 if self.TYPE == 'processes':
1540 self.assertEqual(reader.readable, True)
1541 self.assertEqual(reader.writable, False)
1542 self.assertEqual(writer.readable, False)
1543 self.assertEqual(writer.writable, True)
1544 self.assertRaises(IOError, reader.send, 2)
1545 self.assertRaises(IOError, writer.recv)
1546 self.assertRaises(IOError, writer.poll)
1547
1548 def test_spawn_close(self):
1549 # We test that a pipe connection can be closed by parent
1550 # process immediately after child is spawned. On Windows this
1551 # would have sometimes failed on old versions because
1552 # child_conn would be closed before the child got a chance to
1553 # duplicate it.
1554 conn, child_conn = self.Pipe()
1555
1556 p = self.Process(target=self._echo, args=(child_conn,))
1557 p.start()
1558 child_conn.close() # this might complete before child initializes
1559
1560 msg = latin('hello')
1561 conn.send_bytes(msg)
1562 self.assertEqual(conn.recv_bytes(), msg)
1563
1564 conn.send_bytes(SENTINEL)
1565 conn.close()
1566 p.join()
1567
1568 def test_sendbytes(self):
1569 if self.TYPE != 'processes':
1570 return
1571
1572 msg = latin('abcdefghijklmnopqrstuvwxyz')
1573 a, b = self.Pipe()
1574
1575 a.send_bytes(msg)
1576 self.assertEqual(b.recv_bytes(), msg)
1577
1578 a.send_bytes(msg, 5)
1579 self.assertEqual(b.recv_bytes(), msg[5:])
1580
1581 a.send_bytes(msg, 7, 8)
1582 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1583
1584 a.send_bytes(msg, 26)
1585 self.assertEqual(b.recv_bytes(), latin(''))
1586
1587 a.send_bytes(msg, 26, 0)
1588 self.assertEqual(b.recv_bytes(), latin(''))
1589
1590 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1591
1592 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1593
1594 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1595
1596 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1597
1598 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1599
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001600 @classmethod
1601 def _is_fd_assigned(cls, fd):
1602 try:
1603 os.fstat(fd)
1604 except OSError as e:
1605 if e.errno == errno.EBADF:
1606 return False
1607 raise
1608 else:
1609 return True
1610
1611 @classmethod
1612 def _writefd(cls, conn, data, create_dummy_fds=False):
1613 if create_dummy_fds:
1614 for i in range(0, 256):
1615 if not cls._is_fd_assigned(i):
1616 os.dup2(conn.fileno(), i)
1617 fd = reduction.recv_handle(conn)
1618 if msvcrt:
1619 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1620 os.write(fd, data)
1621 os.close(fd)
1622
1623 def test_fd_transfer(self):
1624 if self.TYPE != 'processes':
1625 self.skipTest("only makes sense with processes")
1626 conn, child_conn = self.Pipe(duplex=True)
1627
1628 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
1629 p.start()
1630 with open(test.support.TESTFN, "wb") as f:
1631 fd = f.fileno()
1632 if msvcrt:
1633 fd = msvcrt.get_osfhandle(fd)
1634 reduction.send_handle(conn, fd, p.pid)
1635 p.join()
1636 with open(test.support.TESTFN, "rb") as f:
1637 self.assertEqual(f.read(), b"foo")
1638
1639 @unittest.skipIf(sys.platform == "win32",
1640 "test semantics don't make sense on Windows")
1641 @unittest.skipIf(MAXFD <= 256,
1642 "largest assignable fd number is too small")
1643 @unittest.skipUnless(hasattr(os, "dup2"),
1644 "test needs os.dup2()")
1645 def test_large_fd_transfer(self):
1646 # With fd > 256 (issue #11657)
1647 if self.TYPE != 'processes':
1648 self.skipTest("only makes sense with processes")
1649 conn, child_conn = self.Pipe(duplex=True)
1650
1651 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
1652 p.start()
1653 with open(test.support.TESTFN, "wb") as f:
1654 fd = f.fileno()
1655 for newfd in range(256, MAXFD):
1656 if not self._is_fd_assigned(newfd):
1657 break
1658 else:
1659 self.fail("could not find an unassigned large file descriptor")
1660 os.dup2(fd, newfd)
1661 try:
1662 reduction.send_handle(conn, newfd, p.pid)
1663 finally:
1664 os.close(newfd)
1665 p.join()
1666 with open(test.support.TESTFN, "rb") as f:
1667 self.assertEqual(f.read(), b"bar")
1668
1669
Benjamin Petersone711caf2008-06-11 16:44:04 +00001670class _TestListenerClient(BaseTestCase):
1671
1672 ALLOWED_TYPES = ('processes', 'threads')
1673
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001674 @classmethod
1675 def _test(cls, address):
1676 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001677 conn.send('hello')
1678 conn.close()
1679
1680 def test_listener_client(self):
1681 for family in self.connection.families:
1682 l = self.connection.Listener(family=family)
1683 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001684 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001685 p.start()
1686 conn = l.accept()
1687 self.assertEqual(conn.recv(), 'hello')
1688 p.join()
1689 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001690#
1691# Test of sending connection and socket objects between processes
1692#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001693"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001694class _TestPicklingConnections(BaseTestCase):
1695
1696 ALLOWED_TYPES = ('processes',)
1697
1698 def _listener(self, conn, families):
1699 for fam in families:
1700 l = self.connection.Listener(family=fam)
1701 conn.send(l.address)
1702 new_conn = l.accept()
1703 conn.send(new_conn)
1704
1705 if self.TYPE == 'processes':
1706 l = socket.socket()
1707 l.bind(('localhost', 0))
1708 conn.send(l.getsockname())
1709 l.listen(1)
1710 new_conn, addr = l.accept()
1711 conn.send(new_conn)
1712
1713 conn.recv()
1714
1715 def _remote(self, conn):
1716 for (address, msg) in iter(conn.recv, None):
1717 client = self.connection.Client(address)
1718 client.send(msg.upper())
1719 client.close()
1720
1721 if self.TYPE == 'processes':
1722 address, msg = conn.recv()
1723 client = socket.socket()
1724 client.connect(address)
1725 client.sendall(msg.upper())
1726 client.close()
1727
1728 conn.close()
1729
1730 def test_pickling(self):
1731 try:
1732 multiprocessing.allow_connection_pickling()
1733 except ImportError:
1734 return
1735
1736 families = self.connection.families
1737
1738 lconn, lconn0 = self.Pipe()
1739 lp = self.Process(target=self._listener, args=(lconn0, families))
1740 lp.start()
1741 lconn0.close()
1742
1743 rconn, rconn0 = self.Pipe()
1744 rp = self.Process(target=self._remote, args=(rconn0,))
1745 rp.start()
1746 rconn0.close()
1747
1748 for fam in families:
1749 msg = ('This connection uses family %s' % fam).encode('ascii')
1750 address = lconn.recv()
1751 rconn.send((address, msg))
1752 new_conn = lconn.recv()
1753 self.assertEqual(new_conn.recv(), msg.upper())
1754
1755 rconn.send(None)
1756
1757 if self.TYPE == 'processes':
1758 msg = latin('This connection uses a normal socket')
1759 address = lconn.recv()
1760 rconn.send((address, msg))
1761 if hasattr(socket, 'fromfd'):
1762 new_conn = lconn.recv()
1763 self.assertEqual(new_conn.recv(100), msg.upper())
1764 else:
1765 # XXX On Windows with Py2.6 need to backport fromfd()
1766 discard = lconn.recv_bytes()
1767
1768 lconn.send(None)
1769
1770 rconn.close()
1771 lconn.close()
1772
1773 lp.join()
1774 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001775"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001776#
1777#
1778#
1779
1780class _TestHeap(BaseTestCase):
1781
1782 ALLOWED_TYPES = ('processes',)
1783
1784 def test_heap(self):
1785 iterations = 5000
1786 maxblocks = 50
1787 blocks = []
1788
1789 # create and destroy lots of blocks of different sizes
1790 for i in range(iterations):
1791 size = int(random.lognormvariate(0, 1) * 1000)
1792 b = multiprocessing.heap.BufferWrapper(size)
1793 blocks.append(b)
1794 if len(blocks) > maxblocks:
1795 i = random.randrange(maxblocks)
1796 del blocks[i]
1797
1798 # get the heap object
1799 heap = multiprocessing.heap.BufferWrapper._heap
1800
1801 # verify the state of the heap
1802 all = []
1803 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001804 heap._lock.acquire()
1805 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001806 for L in list(heap._len_to_seq.values()):
1807 for arena, start, stop in L:
1808 all.append((heap._arenas.index(arena), start, stop,
1809 stop-start, 'free'))
1810 for arena, start, stop in heap._allocated_blocks:
1811 all.append((heap._arenas.index(arena), start, stop,
1812 stop-start, 'occupied'))
1813 occupied += (stop-start)
1814
1815 all.sort()
1816
1817 for i in range(len(all)-1):
1818 (arena, start, stop) = all[i][:3]
1819 (narena, nstart, nstop) = all[i+1][:3]
1820 self.assertTrue((arena != narena and nstart == 0) or
1821 (stop == nstart))
1822
Charles-François Natali778db492011-07-02 14:35:49 +02001823 def test_free_from_gc(self):
1824 # Check that freeing of blocks by the garbage collector doesn't deadlock
1825 # (issue #12352).
1826 # Make sure the GC is enabled, and set lower collection thresholds to
1827 # make collections more frequent (and increase the probability of
1828 # deadlock).
1829 if not gc.isenabled():
1830 gc.enable()
1831 self.addCleanup(gc.disable)
1832 thresholds = gc.get_threshold()
1833 self.addCleanup(gc.set_threshold, *thresholds)
1834 gc.set_threshold(10)
1835
1836 # perform numerous block allocations, with cyclic references to make
1837 # sure objects are collected asynchronously by the gc
1838 for i in range(5000):
1839 a = multiprocessing.heap.BufferWrapper(1)
1840 b = multiprocessing.heap.BufferWrapper(1)
1841 # circular references
1842 a.buddy = b
1843 b.buddy = a
1844
Benjamin Petersone711caf2008-06-11 16:44:04 +00001845#
1846#
1847#
1848
Benjamin Petersone711caf2008-06-11 16:44:04 +00001849class _Foo(Structure):
1850 _fields_ = [
1851 ('x', c_int),
1852 ('y', c_double)
1853 ]
1854
1855class _TestSharedCTypes(BaseTestCase):
1856
1857 ALLOWED_TYPES = ('processes',)
1858
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001859 def setUp(self):
1860 if not HAS_SHAREDCTYPES:
1861 self.skipTest("requires multiprocessing.sharedctypes")
1862
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001863 @classmethod
1864 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865 x.value *= 2
1866 y.value *= 2
1867 foo.x *= 2
1868 foo.y *= 2
1869 string.value *= 2
1870 for i in range(len(arr)):
1871 arr[i] *= 2
1872
1873 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001874 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001875 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001876 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001877 arr = self.Array('d', list(range(10)), lock=lock)
1878 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001879 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001880
1881 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1882 p.start()
1883 p.join()
1884
1885 self.assertEqual(x.value, 14)
1886 self.assertAlmostEqual(y.value, 2.0/3.0)
1887 self.assertEqual(foo.x, 6)
1888 self.assertAlmostEqual(foo.y, 4.0)
1889 for i in range(10):
1890 self.assertAlmostEqual(arr[i], i*2)
1891 self.assertEqual(string.value, latin('hellohello'))
1892
1893 def test_synchronize(self):
1894 self.test_sharedctypes(lock=True)
1895
1896 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001897 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001898 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001899 foo.x = 0
1900 foo.y = 0
1901 self.assertEqual(bar.x, 2)
1902 self.assertAlmostEqual(bar.y, 5.0)
1903
1904#
1905#
1906#
1907
1908class _TestFinalize(BaseTestCase):
1909
1910 ALLOWED_TYPES = ('processes',)
1911
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001912 @classmethod
1913 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001914 class Foo(object):
1915 pass
1916
1917 a = Foo()
1918 util.Finalize(a, conn.send, args=('a',))
1919 del a # triggers callback for a
1920
1921 b = Foo()
1922 close_b = util.Finalize(b, conn.send, args=('b',))
1923 close_b() # triggers callback for b
1924 close_b() # does nothing because callback has already been called
1925 del b # does nothing because callback has already been called
1926
1927 c = Foo()
1928 util.Finalize(c, conn.send, args=('c',))
1929
1930 d10 = Foo()
1931 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1932
1933 d01 = Foo()
1934 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1935 d02 = Foo()
1936 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1937 d03 = Foo()
1938 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1939
1940 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1941
1942 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1943
Ezio Melotti13925002011-03-16 11:05:33 +02001944 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001945 # garbage collecting locals
1946 util._exit_function()
1947 conn.close()
1948 os._exit(0)
1949
1950 def test_finalize(self):
1951 conn, child_conn = self.Pipe()
1952
1953 p = self.Process(target=self._test_finalize, args=(child_conn,))
1954 p.start()
1955 p.join()
1956
1957 result = [obj for obj in iter(conn.recv, 'STOP')]
1958 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1959
1960#
1961# Test that from ... import * works for each module
1962#
1963
1964class _TestImportStar(BaseTestCase):
1965
1966 ALLOWED_TYPES = ('processes',)
1967
1968 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001969 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001970 'multiprocessing', 'multiprocessing.connection',
1971 'multiprocessing.heap', 'multiprocessing.managers',
1972 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001973 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001974 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001975 ]
1976
1977 if c_int is not None:
1978 # This module requires _ctypes
1979 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001980
1981 for name in modules:
1982 __import__(name)
1983 mod = sys.modules[name]
1984
1985 for attr in getattr(mod, '__all__', ()):
1986 self.assertTrue(
1987 hasattr(mod, attr),
1988 '%r does not have attribute %r' % (mod, attr)
1989 )
1990
1991#
1992# Quick test that logging works -- does not test logging output
1993#
1994
1995class _TestLogging(BaseTestCase):
1996
1997 ALLOWED_TYPES = ('processes',)
1998
1999 def test_enable_logging(self):
2000 logger = multiprocessing.get_logger()
2001 logger.setLevel(util.SUBWARNING)
2002 self.assertTrue(logger is not None)
2003 logger.debug('this will not be printed')
2004 logger.info('nor will this')
2005 logger.setLevel(LOG_LEVEL)
2006
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002007 @classmethod
2008 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002009 logger = multiprocessing.get_logger()
2010 conn.send(logger.getEffectiveLevel())
2011
2012 def test_level(self):
2013 LEVEL1 = 32
2014 LEVEL2 = 37
2015
2016 logger = multiprocessing.get_logger()
2017 root_logger = logging.getLogger()
2018 root_level = root_logger.level
2019
2020 reader, writer = multiprocessing.Pipe(duplex=False)
2021
2022 logger.setLevel(LEVEL1)
2023 self.Process(target=self._test_level, args=(writer,)).start()
2024 self.assertEqual(LEVEL1, reader.recv())
2025
2026 logger.setLevel(logging.NOTSET)
2027 root_logger.setLevel(LEVEL2)
2028 self.Process(target=self._test_level, args=(writer,)).start()
2029 self.assertEqual(LEVEL2, reader.recv())
2030
2031 root_logger.setLevel(root_level)
2032 logger.setLevel(level=LOG_LEVEL)
2033
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002034
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002035# class _TestLoggingProcessName(BaseTestCase):
2036#
2037# def handle(self, record):
2038# assert record.processName == multiprocessing.current_process().name
2039# self.__handled = True
2040#
2041# def test_logging(self):
2042# handler = logging.Handler()
2043# handler.handle = self.handle
2044# self.__handled = False
2045# # Bypass getLogger() and side-effects
2046# logger = logging.getLoggerClass()(
2047# 'multiprocessing.test.TestLoggingProcessName')
2048# logger.addHandler(handler)
2049# logger.propagate = False
2050#
2051# logger.warn('foo')
2052# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002053
Benjamin Petersone711caf2008-06-11 16:44:04 +00002054#
Jesse Noller6214edd2009-01-19 16:23:53 +00002055# Test to verify handle verification, see issue 3321
2056#
2057
2058class TestInvalidHandle(unittest.TestCase):
2059
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002060 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002061 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002062 conn = multiprocessing.connection.Connection(44977608)
2063 try:
2064 self.assertRaises((ValueError, IOError), conn.poll)
2065 finally:
2066 # Hack private attribute _handle to avoid printing an error
2067 # in conn.__del__
2068 conn._handle = None
2069 self.assertRaises((ValueError, IOError),
2070 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002071
Jesse Noller6214edd2009-01-19 16:23:53 +00002072#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002073# Functions used to create test cases from the base ones in this module
2074#
2075
2076def get_attributes(Source, names):
2077 d = {}
2078 for name in names:
2079 obj = getattr(Source, name)
2080 if type(obj) == type(get_attributes):
2081 obj = staticmethod(obj)
2082 d[name] = obj
2083 return d
2084
2085def create_test_cases(Mixin, type):
2086 result = {}
2087 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002088 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002089
2090 for name in list(glob.keys()):
2091 if name.startswith('_Test'):
2092 base = glob[name]
2093 if type in base.ALLOWED_TYPES:
2094 newname = 'With' + Type + name[1:]
2095 class Temp(base, unittest.TestCase, Mixin):
2096 pass
2097 result[newname] = Temp
2098 Temp.__name__ = newname
2099 Temp.__module__ = Mixin.__module__
2100 return result
2101
2102#
2103# Create test cases
2104#
2105
2106class ProcessesMixin(object):
2107 TYPE = 'processes'
2108 Process = multiprocessing.Process
2109 locals().update(get_attributes(multiprocessing, (
2110 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2111 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2112 'RawArray', 'current_process', 'active_children', 'Pipe',
2113 'connection', 'JoinableQueue'
2114 )))
2115
2116testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2117globals().update(testcases_processes)
2118
2119
2120class ManagerMixin(object):
2121 TYPE = 'manager'
2122 Process = multiprocessing.Process
2123 manager = object.__new__(multiprocessing.managers.SyncManager)
2124 locals().update(get_attributes(manager, (
2125 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2126 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2127 'Namespace', 'JoinableQueue'
2128 )))
2129
2130testcases_manager = create_test_cases(ManagerMixin, type='manager')
2131globals().update(testcases_manager)
2132
2133
2134class ThreadsMixin(object):
2135 TYPE = 'threads'
2136 Process = multiprocessing.dummy.Process
2137 locals().update(get_attributes(multiprocessing.dummy, (
2138 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2139 'Condition', 'Event', 'Value', 'Array', 'current_process',
2140 'active_children', 'Pipe', 'connection', 'dict', 'list',
2141 'Namespace', 'JoinableQueue'
2142 )))
2143
2144testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2145globals().update(testcases_threads)
2146
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002147class OtherTest(unittest.TestCase):
2148 # TODO: add more tests for deliver/answer challenge.
2149 def test_deliver_challenge_auth_failure(self):
2150 class _FakeConnection(object):
2151 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002152 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002153 def send_bytes(self, data):
2154 pass
2155 self.assertRaises(multiprocessing.AuthenticationError,
2156 multiprocessing.connection.deliver_challenge,
2157 _FakeConnection(), b'abc')
2158
2159 def test_answer_challenge_auth_failure(self):
2160 class _FakeConnection(object):
2161 def __init__(self):
2162 self.count = 0
2163 def recv_bytes(self, size):
2164 self.count += 1
2165 if self.count == 1:
2166 return multiprocessing.connection.CHALLENGE
2167 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002168 return b'something bogus'
2169 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002170 def send_bytes(self, data):
2171 pass
2172 self.assertRaises(multiprocessing.AuthenticationError,
2173 multiprocessing.connection.answer_challenge,
2174 _FakeConnection(), b'abc')
2175
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002176#
2177# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2178#
2179
2180def initializer(ns):
2181 ns.test += 1
2182
2183class TestInitializers(unittest.TestCase):
2184 def setUp(self):
2185 self.mgr = multiprocessing.Manager()
2186 self.ns = self.mgr.Namespace()
2187 self.ns.test = 0
2188
2189 def tearDown(self):
2190 self.mgr.shutdown()
2191
2192 def test_manager_initializer(self):
2193 m = multiprocessing.managers.SyncManager()
2194 self.assertRaises(TypeError, m.start, 1)
2195 m.start(initializer, (self.ns,))
2196 self.assertEqual(self.ns.test, 1)
2197 m.shutdown()
2198
2199 def test_pool_initializer(self):
2200 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2201 p = multiprocessing.Pool(1, initializer, (self.ns,))
2202 p.close()
2203 p.join()
2204 self.assertEqual(self.ns.test, 1)
2205
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002206#
2207# Issue 5155, 5313, 5331: Test process in processes
2208# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2209#
2210
2211def _ThisSubProcess(q):
2212 try:
2213 item = q.get(block=False)
2214 except pyqueue.Empty:
2215 pass
2216
2217def _TestProcess(q):
2218 queue = multiprocessing.Queue()
2219 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2220 subProc.start()
2221 subProc.join()
2222
2223def _afunc(x):
2224 return x*x
2225
2226def pool_in_process():
2227 pool = multiprocessing.Pool(processes=4)
2228 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2229
2230class _file_like(object):
2231 def __init__(self, delegate):
2232 self._delegate = delegate
2233 self._pid = None
2234
2235 @property
2236 def cache(self):
2237 pid = os.getpid()
2238 # There are no race conditions since fork keeps only the running thread
2239 if pid != self._pid:
2240 self._pid = pid
2241 self._cache = []
2242 return self._cache
2243
2244 def write(self, data):
2245 self.cache.append(data)
2246
2247 def flush(self):
2248 self._delegate.write(''.join(self.cache))
2249 self._cache = []
2250
2251class TestStdinBadfiledescriptor(unittest.TestCase):
2252
2253 def test_queue_in_process(self):
2254 queue = multiprocessing.Queue()
2255 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2256 proc.start()
2257 proc.join()
2258
2259 def test_pool_in_process(self):
2260 p = multiprocessing.Process(target=pool_in_process)
2261 p.start()
2262 p.join()
2263
2264 def test_flushing(self):
2265 sio = io.StringIO()
2266 flike = _file_like(sio)
2267 flike.write('foo')
2268 proc = multiprocessing.Process(target=lambda: flike.flush())
2269 flike.flush()
2270 assert sio.getvalue() == 'foo'
2271
2272testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2273 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002274
Benjamin Petersone711caf2008-06-11 16:44:04 +00002275#
2276#
2277#
2278
2279def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002280 if sys.platform.startswith("linux"):
2281 try:
2282 lock = multiprocessing.RLock()
2283 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002284 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002285
Benjamin Petersone711caf2008-06-11 16:44:04 +00002286 if run is None:
2287 from test.support import run_unittest as run
2288
2289 util.get_temp_dir() # creates temp directory for use by all processes
2290
2291 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2292
Benjamin Peterson41181742008-07-02 20:22:54 +00002293 ProcessesMixin.pool = multiprocessing.Pool(4)
2294 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2295 ManagerMixin.manager.__init__()
2296 ManagerMixin.manager.start()
2297 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002298
2299 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002300 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2301 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002302 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2303 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002304 )
2305
2306 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2307 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2308 run(suite)
2309
Benjamin Peterson41181742008-07-02 20:22:54 +00002310 ThreadsMixin.pool.terminate()
2311 ProcessesMixin.pool.terminate()
2312 ManagerMixin.pool.terminate()
2313 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002314
Benjamin Peterson41181742008-07-02 20:22:54 +00002315 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002316
2317def main():
2318 test_main(unittest.TextTestRunner(verbosity=2).run)
2319
2320if __name__ == '__main__':
2321 main()