blob: ca20396616c85c9a0ff2e153bc551a096e4280cc [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
Brian Curtinafa88b52010-10-07 01:12:19 +000040try:
41 from multiprocessing.sharedctypes import Value, copy
42 HAS_SHAREDCTYPES = True
43except ImportError:
44 HAS_SHAREDCTYPES = False
45
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47#
48#
49
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000050def latin(s):
51 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
Benjamin Petersone711caf2008-06-11 16:44:04 +000053#
54# Constants
55#
56
57LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000058#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000059
60DELTA = 0.1
61CHECK_TIMINGS = False # making true makes tests take a lot longer
62 # and can sometimes cause some non-serious
63 # failures because some calls block a bit
64 # longer than expected
65if CHECK_TIMINGS:
66 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
67else:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
69
70HAVE_GETVALUE = not getattr(_multiprocessing,
71 'HAVE_BROKEN_SEM_GETVALUE', False)
72
Jesse Noller6214edd2009-01-19 16:23:53 +000073WIN32 = (sys.platform == "win32")
74
Benjamin Petersone711caf2008-06-11 16:44:04 +000075#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000076# Some tests require ctypes
77#
78
79try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000080 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000081except ImportError:
82 Structure = object
83 c_int = c_double = None
84
85#
Benjamin Petersone711caf2008-06-11 16:44:04 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000122 # For the sanity of Windows users, rather than crashing or freezing in
123 # multiple ways.
124 def __reduce__(self, *args):
125 raise NotImplementedError("shouldn't try to pickle a test case")
126
127 __reduce_ex__ = __reduce__
128
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129#
130# Return the value of a semaphore
131#
132
133def get_value(self):
134 try:
135 return self.get_value()
136 except AttributeError:
137 try:
138 return self._Semaphore__value
139 except AttributeError:
140 try:
141 return self._value
142 except AttributeError:
143 raise NotImplementedError
144
145#
146# Testcases
147#
148
149class _TestProcess(BaseTestCase):
150
151 ALLOWED_TYPES = ('processes', 'threads')
152
153 def test_current(self):
154 if self.TYPE == 'threads':
155 return
156
157 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159
160 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000162 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000164 self.assertEqual(current.ident, os.getpid())
165 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000167 @classmethod
168 def _test(cls, q, *args, **kwds):
169 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 q.put(args)
171 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000173 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175 q.put(current.pid)
176
177 def test_process(self):
178 q = self.Queue(1)
179 e = self.Event()
180 args = (q, 1, 2)
181 kwargs = {'hello':23, 'bye':2.54}
182 name = 'SomeProcess'
183 p = self.Process(
184 target=self._test, args=args, kwargs=kwargs, name=name
185 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 current = self.current_process()
188
189 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(p.authkey, current.authkey)
191 self.assertEqual(p.is_alive(), False)
192 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000193 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196
197 p.start()
198
Ezio Melottib3aedd42010-11-20 19:04:17 +0000199 self.assertEqual(p.exitcode, None)
200 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000201 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
Ezio Melottib3aedd42010-11-20 19:04:17 +0000203 self.assertEqual(q.get(), args[1:])
204 self.assertEqual(q.get(), kwargs)
205 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000207 self.assertEqual(q.get(), current.authkey)
208 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 p.join()
211
Ezio Melottib3aedd42010-11-20 19:04:17 +0000212 self.assertEqual(p.exitcode, 0)
213 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000214 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000216 @classmethod
217 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 time.sleep(1000)
219
220 def test_terminate(self):
221 if self.TYPE == 'threads':
222 return
223
224 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 p.start()
227
228 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000229 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232 p.terminate()
233
234 join = TimingWrapper(p.join)
235 self.assertEqual(join(), None)
236 self.assertTimingAlmostEqual(join.elapsed, 0.0)
237
238 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000239 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240
241 p.join()
242
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000243 # XXX sometimes get p.exitcode == 0 on Windows ...
244 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245
246 def test_cpu_count(self):
247 try:
248 cpus = multiprocessing.cpu_count()
249 except NotImplementedError:
250 cpus = 1
251 self.assertTrue(type(cpus) is int)
252 self.assertTrue(cpus >= 1)
253
254 def test_active_children(self):
255 self.assertEqual(type(self.active_children()), list)
256
257 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259
260 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000264 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000266 @classmethod
267 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 from multiprocessing import forking
269 wconn.send(id)
270 if len(id) < 2:
271 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000272 p = cls.Process(
273 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274 )
275 p.start()
276 p.join()
277
278 def test_recursion(self):
279 rconn, wconn = self.Pipe(duplex=False)
280 self._test_recursion(wconn, [])
281
282 time.sleep(DELTA)
283 result = []
284 while rconn.poll():
285 result.append(rconn.recv())
286
287 expected = [
288 [],
289 [0],
290 [0, 0],
291 [0, 1],
292 [1],
293 [1, 0],
294 [1, 1]
295 ]
296 self.assertEqual(result, expected)
297
298#
299#
300#
301
302class _UpperCaser(multiprocessing.Process):
303
304 def __init__(self):
305 multiprocessing.Process.__init__(self)
306 self.child_conn, self.parent_conn = multiprocessing.Pipe()
307
308 def run(self):
309 self.parent_conn.close()
310 for s in iter(self.child_conn.recv, None):
311 self.child_conn.send(s.upper())
312 self.child_conn.close()
313
314 def submit(self, s):
315 assert type(s) is str
316 self.parent_conn.send(s)
317 return self.parent_conn.recv()
318
319 def stop(self):
320 self.parent_conn.send(None)
321 self.parent_conn.close()
322 self.child_conn.close()
323
324class _TestSubclassingProcess(BaseTestCase):
325
326 ALLOWED_TYPES = ('processes',)
327
328 def test_subclassing(self):
329 uppercaser = _UpperCaser()
330 uppercaser.start()
331 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
332 self.assertEqual(uppercaser.submit('world'), 'WORLD')
333 uppercaser.stop()
334 uppercaser.join()
335
336#
337#
338#
339
340def queue_empty(q):
341 if hasattr(q, 'empty'):
342 return q.empty()
343 else:
344 return q.qsize() == 0
345
346def queue_full(q, maxsize):
347 if hasattr(q, 'full'):
348 return q.full()
349 else:
350 return q.qsize() == maxsize
351
352
353class _TestQueue(BaseTestCase):
354
355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000356 @classmethod
357 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000358 child_can_start.wait()
359 for i in range(6):
360 queue.get()
361 parent_can_continue.set()
362
363 def test_put(self):
364 MAXSIZE = 6
365 queue = self.Queue(maxsize=MAXSIZE)
366 child_can_start = self.Event()
367 parent_can_continue = self.Event()
368
369 proc = self.Process(
370 target=self._test_put,
371 args=(queue, child_can_start, parent_can_continue)
372 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000373 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000374 proc.start()
375
376 self.assertEqual(queue_empty(queue), True)
377 self.assertEqual(queue_full(queue, MAXSIZE), False)
378
379 queue.put(1)
380 queue.put(2, True)
381 queue.put(3, True, None)
382 queue.put(4, False)
383 queue.put(5, False, None)
384 queue.put_nowait(6)
385
386 # the values may be in buffer but not yet in pipe so sleep a bit
387 time.sleep(DELTA)
388
389 self.assertEqual(queue_empty(queue), False)
390 self.assertEqual(queue_full(queue, MAXSIZE), True)
391
392 put = TimingWrapper(queue.put)
393 put_nowait = TimingWrapper(queue.put_nowait)
394
395 self.assertRaises(pyqueue.Full, put, 7, False)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put, 7, False, None)
399 self.assertTimingAlmostEqual(put.elapsed, 0)
400
401 self.assertRaises(pyqueue.Full, put_nowait, 7)
402 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
403
404 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
405 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
406
407 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
411 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 self.assertEqual(queue_empty(queue), True)
417 self.assertEqual(queue_full(queue, MAXSIZE), False)
418
419 proc.join()
420
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000421 @classmethod
422 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000424 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000425 queue.put(2)
426 queue.put(3)
427 queue.put(4)
428 queue.put(5)
429 parent_can_continue.set()
430
431 def test_get(self):
432 queue = self.Queue()
433 child_can_start = self.Event()
434 parent_can_continue = self.Event()
435
436 proc = self.Process(
437 target=self._test_get,
438 args=(queue, child_can_start, parent_can_continue)
439 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000440 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 proc.start()
442
443 self.assertEqual(queue_empty(queue), True)
444
445 child_can_start.set()
446 parent_can_continue.wait()
447
448 time.sleep(DELTA)
449 self.assertEqual(queue_empty(queue), False)
450
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000451 # Hangs unexpectedly, remove for now
452 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000453 self.assertEqual(queue.get(True, None), 2)
454 self.assertEqual(queue.get(True), 3)
455 self.assertEqual(queue.get(timeout=1), 4)
456 self.assertEqual(queue.get_nowait(), 5)
457
458 self.assertEqual(queue_empty(queue), True)
459
460 get = TimingWrapper(queue.get)
461 get_nowait = TimingWrapper(queue.get_nowait)
462
463 self.assertRaises(pyqueue.Empty, get, False)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(pyqueue.Empty, get, False, None)
467 self.assertTimingAlmostEqual(get.elapsed, 0)
468
469 self.assertRaises(pyqueue.Empty, get_nowait)
470 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
471
472 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
473 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
474
475 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
479 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
480
481 proc.join()
482
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000483 @classmethod
484 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485 for i in range(10, 20):
486 queue.put(i)
487 # note that at this point the items may only be buffered, so the
488 # process cannot shutdown until the feeder thread has finished
489 # pushing items onto the pipe.
490
491 def test_fork(self):
492 # Old versions of Queue would fail to create a new feeder
493 # thread for a forked process if the original process had its
494 # own feeder thread. This test checks that this no longer
495 # happens.
496
497 queue = self.Queue()
498
499 # put items on queue so that main process starts a feeder thread
500 for i in range(10):
501 queue.put(i)
502
503 # wait to make sure thread starts before we fork a new process
504 time.sleep(DELTA)
505
506 # fork process
507 p = self.Process(target=self._test_fork, args=(queue,))
508 p.start()
509
510 # check that all expected items are in the queue
511 for i in range(20):
512 self.assertEqual(queue.get(), i)
513 self.assertRaises(pyqueue.Empty, queue.get, False)
514
515 p.join()
516
517 def test_qsize(self):
518 q = self.Queue()
519 try:
520 self.assertEqual(q.qsize(), 0)
521 except NotImplementedError:
522 return
523 q.put(1)
524 self.assertEqual(q.qsize(), 1)
525 q.put(5)
526 self.assertEqual(q.qsize(), 2)
527 q.get()
528 self.assertEqual(q.qsize(), 1)
529 q.get()
530 self.assertEqual(q.qsize(), 0)
531
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000532 @classmethod
533 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000534 for obj in iter(q.get, None):
535 time.sleep(DELTA)
536 q.task_done()
537
538 def test_task_done(self):
539 queue = self.JoinableQueue()
540
541 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000542 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000543
544 workers = [self.Process(target=self._test_task_done, args=(queue,))
545 for i in range(4)]
546
547 for p in workers:
548 p.start()
549
550 for i in range(10):
551 queue.put(i)
552
553 queue.join()
554
555 for p in workers:
556 queue.put(None)
557
558 for p in workers:
559 p.join()
560
561#
562#
563#
564
565class _TestLock(BaseTestCase):
566
567 def test_lock(self):
568 lock = self.Lock()
569 self.assertEqual(lock.acquire(), True)
570 self.assertEqual(lock.acquire(False), False)
571 self.assertEqual(lock.release(), None)
572 self.assertRaises((ValueError, threading.ThreadError), lock.release)
573
574 def test_rlock(self):
575 lock = self.RLock()
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.acquire(), True)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertEqual(lock.release(), None)
582 self.assertRaises((AssertionError, RuntimeError), lock.release)
583
Jesse Nollerf8d00852009-03-31 03:25:07 +0000584 def test_lock_context(self):
585 with self.Lock():
586 pass
587
Benjamin Petersone711caf2008-06-11 16:44:04 +0000588
589class _TestSemaphore(BaseTestCase):
590
591 def _test_semaphore(self, sem):
592 self.assertReturnsIfImplemented(2, get_value, sem)
593 self.assertEqual(sem.acquire(), True)
594 self.assertReturnsIfImplemented(1, get_value, sem)
595 self.assertEqual(sem.acquire(), True)
596 self.assertReturnsIfImplemented(0, get_value, sem)
597 self.assertEqual(sem.acquire(False), False)
598 self.assertReturnsIfImplemented(0, get_value, sem)
599 self.assertEqual(sem.release(), None)
600 self.assertReturnsIfImplemented(1, get_value, sem)
601 self.assertEqual(sem.release(), None)
602 self.assertReturnsIfImplemented(2, get_value, sem)
603
604 def test_semaphore(self):
605 sem = self.Semaphore(2)
606 self._test_semaphore(sem)
607 self.assertEqual(sem.release(), None)
608 self.assertReturnsIfImplemented(3, get_value, sem)
609 self.assertEqual(sem.release(), None)
610 self.assertReturnsIfImplemented(4, get_value, sem)
611
612 def test_bounded_semaphore(self):
613 sem = self.BoundedSemaphore(2)
614 self._test_semaphore(sem)
615 # Currently fails on OS/X
616 #if HAVE_GETVALUE:
617 # self.assertRaises(ValueError, sem.release)
618 # self.assertReturnsIfImplemented(2, get_value, sem)
619
620 def test_timeout(self):
621 if self.TYPE != 'processes':
622 return
623
624 sem = self.Semaphore(0)
625 acquire = TimingWrapper(sem.acquire)
626
627 self.assertEqual(acquire(False), False)
628 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
629
630 self.assertEqual(acquire(False, None), False)
631 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
632
633 self.assertEqual(acquire(False, TIMEOUT1), False)
634 self.assertTimingAlmostEqual(acquire.elapsed, 0)
635
636 self.assertEqual(acquire(True, TIMEOUT2), False)
637 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
638
639 self.assertEqual(acquire(timeout=TIMEOUT3), False)
640 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
641
642
643class _TestCondition(BaseTestCase):
644
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000645 @classmethod
646 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 cond.acquire()
648 sleeping.release()
649 cond.wait(timeout)
650 woken.release()
651 cond.release()
652
653 def check_invariant(self, cond):
654 # this is only supposed to succeed when there are no sleepers
655 if self.TYPE == 'processes':
656 try:
657 sleepers = (cond._sleeping_count.get_value() -
658 cond._woken_count.get_value())
659 self.assertEqual(sleepers, 0)
660 self.assertEqual(cond._wait_semaphore.get_value(), 0)
661 except NotImplementedError:
662 pass
663
664 def test_notify(self):
665 cond = self.Condition()
666 sleeping = self.Semaphore(0)
667 woken = self.Semaphore(0)
668
669 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000670 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 p.start()
672
673 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000674 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000675 p.start()
676
677 # wait for both children to start sleeping
678 sleeping.acquire()
679 sleeping.acquire()
680
681 # check no process/thread has woken up
682 time.sleep(DELTA)
683 self.assertReturnsIfImplemented(0, get_value, woken)
684
685 # wake up one process/thread
686 cond.acquire()
687 cond.notify()
688 cond.release()
689
690 # check one process/thread has woken up
691 time.sleep(DELTA)
692 self.assertReturnsIfImplemented(1, get_value, woken)
693
694 # wake up another
695 cond.acquire()
696 cond.notify()
697 cond.release()
698
699 # check other has woken up
700 time.sleep(DELTA)
701 self.assertReturnsIfImplemented(2, get_value, woken)
702
703 # check state is not mucked up
704 self.check_invariant(cond)
705 p.join()
706
707 def test_notify_all(self):
708 cond = self.Condition()
709 sleeping = self.Semaphore(0)
710 woken = self.Semaphore(0)
711
712 # start some threads/processes which will timeout
713 for i in range(3):
714 p = self.Process(target=self.f,
715 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000716 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000717 p.start()
718
719 t = threading.Thread(target=self.f,
720 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000721 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000722 t.start()
723
724 # wait for them all to sleep
725 for i in range(6):
726 sleeping.acquire()
727
728 # check they have all timed out
729 for i in range(6):
730 woken.acquire()
731 self.assertReturnsIfImplemented(0, get_value, woken)
732
733 # check state is not mucked up
734 self.check_invariant(cond)
735
736 # start some more threads/processes
737 for i in range(3):
738 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000739 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 p.start()
741
742 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000743 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000744 t.start()
745
746 # wait for them to all sleep
747 for i in range(6):
748 sleeping.acquire()
749
750 # check no process/thread has woken up
751 time.sleep(DELTA)
752 self.assertReturnsIfImplemented(0, get_value, woken)
753
754 # wake them all up
755 cond.acquire()
756 cond.notify_all()
757 cond.release()
758
759 # check they have all woken
760 time.sleep(DELTA)
761 self.assertReturnsIfImplemented(6, get_value, woken)
762
763 # check state is not mucked up
764 self.check_invariant(cond)
765
766 def test_timeout(self):
767 cond = self.Condition()
768 wait = TimingWrapper(cond.wait)
769 cond.acquire()
770 res = wait(TIMEOUT1)
771 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000772 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000773 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
774
775
776class _TestEvent(BaseTestCase):
777
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000778 @classmethod
779 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 time.sleep(TIMEOUT2)
781 event.set()
782
783 def test_event(self):
784 event = self.Event()
785 wait = TimingWrapper(event.wait)
786
Ezio Melotti13925002011-03-16 11:05:33 +0200787 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000789 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790
Benjamin Peterson965ce872009-04-05 21:24:58 +0000791 # Removed, threading.Event.wait() will return the value of the __flag
792 # instead of None. API Shear with the semaphore backed mp.Event
793 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000794 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000795 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000796 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
797
798 event.set()
799
800 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000801 self.assertEqual(event.is_set(), True)
802 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000804 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
806 # self.assertEqual(event.is_set(), True)
807
808 event.clear()
809
810 #self.assertEqual(event.is_set(), False)
811
812 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000813 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814
815#
816#
817#
818
819class _TestValue(BaseTestCase):
820
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000821 ALLOWED_TYPES = ('processes',)
822
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 codes_values = [
824 ('i', 4343, 24234),
825 ('d', 3.625, -4.25),
826 ('h', -232, 234),
827 ('c', latin('x'), latin('y'))
828 ]
829
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000830 def setUp(self):
831 if not HAS_SHAREDCTYPES:
832 self.skipTest("requires multiprocessing.sharedctypes")
833
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000834 @classmethod
835 def _test(cls, values):
836 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 sv.value = cv[2]
838
839
840 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841 if raw:
842 values = [self.RawValue(code, value)
843 for code, value, _ in self.codes_values]
844 else:
845 values = [self.Value(code, value)
846 for code, value, _ in self.codes_values]
847
848 for sv, cv in zip(values, self.codes_values):
849 self.assertEqual(sv.value, cv[1])
850
851 proc = self.Process(target=self._test, args=(values,))
852 proc.start()
853 proc.join()
854
855 for sv, cv in zip(values, self.codes_values):
856 self.assertEqual(sv.value, cv[2])
857
858 def test_rawvalue(self):
859 self.test_value(raw=True)
860
861 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000862 val1 = self.Value('i', 5)
863 lock1 = val1.get_lock()
864 obj1 = val1.get_obj()
865
866 val2 = self.Value('i', 5, lock=None)
867 lock2 = val2.get_lock()
868 obj2 = val2.get_obj()
869
870 lock = self.Lock()
871 val3 = self.Value('i', 5, lock=lock)
872 lock3 = val3.get_lock()
873 obj3 = val3.get_obj()
874 self.assertEqual(lock, lock3)
875
Jesse Nollerb0516a62009-01-18 03:11:38 +0000876 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000877 self.assertFalse(hasattr(arr4, 'get_lock'))
878 self.assertFalse(hasattr(arr4, 'get_obj'))
879
Jesse Nollerb0516a62009-01-18 03:11:38 +0000880 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
881
882 arr5 = self.RawValue('i', 5)
883 self.assertFalse(hasattr(arr5, 'get_lock'))
884 self.assertFalse(hasattr(arr5, 'get_obj'))
885
Benjamin Petersone711caf2008-06-11 16:44:04 +0000886
887class _TestArray(BaseTestCase):
888
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000889 ALLOWED_TYPES = ('processes',)
890
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000891 @classmethod
892 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000893 for i in range(1, len(seq)):
894 seq[i] += seq[i-1]
895
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000896 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
899 if raw:
900 arr = self.RawArray('i', seq)
901 else:
902 arr = self.Array('i', seq)
903
904 self.assertEqual(len(arr), len(seq))
905 self.assertEqual(arr[3], seq[3])
906 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
907
908 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
909
910 self.assertEqual(list(arr[:]), seq)
911
912 self.f(seq)
913
914 p = self.Process(target=self.f, args=(arr,))
915 p.start()
916 p.join()
917
918 self.assertEqual(list(arr[:]), seq)
919
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000920 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000921 def test_array_from_size(self):
922 size = 10
923 # Test for zeroing (see issue #11675).
924 # The repetition below strengthens the test by increasing the chances
925 # of previously allocated non-zero memory being used for the new array
926 # on the 2nd and 3rd loops.
927 for _ in range(3):
928 arr = self.Array('i', size)
929 self.assertEqual(len(arr), size)
930 self.assertEqual(list(arr), [0] * size)
931 arr[:] = range(10)
932 self.assertEqual(list(arr), list(range(10)))
933 del arr
934
935 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000936 def test_rawarray(self):
937 self.test_array(raw=True)
938
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000939 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000941 arr1 = self.Array('i', list(range(10)))
942 lock1 = arr1.get_lock()
943 obj1 = arr1.get_obj()
944
945 arr2 = self.Array('i', list(range(10)), lock=None)
946 lock2 = arr2.get_lock()
947 obj2 = arr2.get_obj()
948
949 lock = self.Lock()
950 arr3 = self.Array('i', list(range(10)), lock=lock)
951 lock3 = arr3.get_lock()
952 obj3 = arr3.get_obj()
953 self.assertEqual(lock, lock3)
954
Jesse Nollerb0516a62009-01-18 03:11:38 +0000955 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000956 self.assertFalse(hasattr(arr4, 'get_lock'))
957 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000958 self.assertRaises(AttributeError,
959 self.Array, 'i', range(10), lock='notalock')
960
961 arr5 = self.RawArray('i', range(10))
962 self.assertFalse(hasattr(arr5, 'get_lock'))
963 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964
965#
966#
967#
968
969class _TestContainers(BaseTestCase):
970
971 ALLOWED_TYPES = ('manager',)
972
973 def test_list(self):
974 a = self.list(list(range(10)))
975 self.assertEqual(a[:], list(range(10)))
976
977 b = self.list()
978 self.assertEqual(b[:], [])
979
980 b.extend(list(range(5)))
981 self.assertEqual(b[:], list(range(5)))
982
983 self.assertEqual(b[2], 2)
984 self.assertEqual(b[2:10], [2,3,4])
985
986 b *= 2
987 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
988
989 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
990
991 self.assertEqual(a[:], list(range(10)))
992
993 d = [a, b]
994 e = self.list(d)
995 self.assertEqual(
996 e[:],
997 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
998 )
999
1000 f = self.list([a])
1001 a.append('hello')
1002 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1003
1004 def test_dict(self):
1005 d = self.dict()
1006 indices = list(range(65, 70))
1007 for i in indices:
1008 d[i] = chr(i)
1009 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1010 self.assertEqual(sorted(d.keys()), indices)
1011 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1012 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1013
1014 def test_namespace(self):
1015 n = self.Namespace()
1016 n.name = 'Bob'
1017 n.job = 'Builder'
1018 n._hidden = 'hidden'
1019 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1020 del n.job
1021 self.assertEqual(str(n), "Namespace(name='Bob')")
1022 self.assertTrue(hasattr(n, 'name'))
1023 self.assertTrue(not hasattr(n, 'job'))
1024
1025#
1026#
1027#
1028
1029def sqr(x, wait=0.0):
1030 time.sleep(wait)
1031 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001032
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033class _TestPool(BaseTestCase):
1034
1035 def test_apply(self):
1036 papply = self.pool.apply
1037 self.assertEqual(papply(sqr, (5,)), sqr(5))
1038 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1039
1040 def test_map(self):
1041 pmap = self.pool.map
1042 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1043 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1044 list(map(sqr, list(range(100)))))
1045
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001046 def test_map_chunksize(self):
1047 try:
1048 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1049 except multiprocessing.TimeoutError:
1050 self.fail("pool.map_async with chunksize stalled on null list")
1051
Benjamin Petersone711caf2008-06-11 16:44:04 +00001052 def test_async(self):
1053 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1054 get = TimingWrapper(res.get)
1055 self.assertEqual(get(), 49)
1056 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1057
1058 def test_async_timeout(self):
1059 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1060 get = TimingWrapper(res.get)
1061 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1062 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1063
1064 def test_imap(self):
1065 it = self.pool.imap(sqr, list(range(10)))
1066 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1067
1068 it = self.pool.imap(sqr, list(range(10)))
1069 for i in range(10):
1070 self.assertEqual(next(it), i*i)
1071 self.assertRaises(StopIteration, it.__next__)
1072
1073 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1074 for i in range(1000):
1075 self.assertEqual(next(it), i*i)
1076 self.assertRaises(StopIteration, it.__next__)
1077
1078 def test_imap_unordered(self):
1079 it = self.pool.imap_unordered(sqr, list(range(1000)))
1080 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1081
1082 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1083 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1084
1085 def test_make_pool(self):
1086 p = multiprocessing.Pool(3)
1087 self.assertEqual(3, len(p._pool))
1088 p.close()
1089 p.join()
1090
1091 def test_terminate(self):
1092 if self.TYPE == 'manager':
1093 # On Unix a forked process increfs each shared object to
1094 # which its parent process held a reference. If the
1095 # forked process gets terminated then there is likely to
1096 # be a reference leak. So to prevent
1097 # _TestZZZNumberOfObjects from failing we skip this test
1098 # when using a manager.
1099 return
1100
1101 result = self.pool.map_async(
1102 time.sleep, [0.1 for i in range(10000)], chunksize=1
1103 )
1104 self.pool.terminate()
1105 join = TimingWrapper(self.pool.join)
1106 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001107 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001108
Ask Solem2afcbf22010-11-09 20:55:52 +00001109def raising():
1110 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001111
Ask Solem2afcbf22010-11-09 20:55:52 +00001112def unpickleable_result():
1113 return lambda: 42
1114
1115class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001116 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001117
1118 def test_async_error_callback(self):
1119 p = multiprocessing.Pool(2)
1120
1121 scratchpad = [None]
1122 def errback(exc):
1123 scratchpad[0] = exc
1124
1125 res = p.apply_async(raising, error_callback=errback)
1126 self.assertRaises(KeyError, res.get)
1127 self.assertTrue(scratchpad[0])
1128 self.assertIsInstance(scratchpad[0], KeyError)
1129
1130 p.close()
1131 p.join()
1132
1133 def test_unpickleable_result(self):
1134 from multiprocessing.pool import MaybeEncodingError
1135 p = multiprocessing.Pool(2)
1136
1137 # Make sure we don't lose pool processes because of encoding errors.
1138 for iteration in range(20):
1139
1140 scratchpad = [None]
1141 def errback(exc):
1142 scratchpad[0] = exc
1143
1144 res = p.apply_async(unpickleable_result, error_callback=errback)
1145 self.assertRaises(MaybeEncodingError, res.get)
1146 wrapped = scratchpad[0]
1147 self.assertTrue(wrapped)
1148 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1149 self.assertIsNotNone(wrapped.exc)
1150 self.assertIsNotNone(wrapped.value)
1151
1152 p.close()
1153 p.join()
1154
1155class _TestPoolWorkerLifetime(BaseTestCase):
1156 ALLOWED_TYPES = ('processes', )
1157
Jesse Noller1f0b6582010-01-27 03:36:01 +00001158 def test_pool_worker_lifetime(self):
1159 p = multiprocessing.Pool(3, maxtasksperchild=10)
1160 self.assertEqual(3, len(p._pool))
1161 origworkerpids = [w.pid for w in p._pool]
1162 # Run many tasks so each worker gets replaced (hopefully)
1163 results = []
1164 for i in range(100):
1165 results.append(p.apply_async(sqr, (i, )))
1166 # Fetch the results and verify we got the right answers,
1167 # also ensuring all the tasks have completed.
1168 for (j, res) in enumerate(results):
1169 self.assertEqual(res.get(), sqr(j))
1170 # Refill the pool
1171 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001172 # Wait until all workers are alive
1173 countdown = 5
1174 while countdown and not all(w.is_alive() for w in p._pool):
1175 countdown -= 1
1176 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001177 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001178 # All pids should be assigned. See issue #7805.
1179 self.assertNotIn(None, origworkerpids)
1180 self.assertNotIn(None, finalworkerpids)
1181 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001182 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1183 p.close()
1184 p.join()
1185
Benjamin Petersone711caf2008-06-11 16:44:04 +00001186#
1187# Test that manager has expected number of shared objects left
1188#
1189
1190class _TestZZZNumberOfObjects(BaseTestCase):
1191 # Because test cases are sorted alphabetically, this one will get
1192 # run after all the other tests for the manager. It tests that
1193 # there have been no "reference leaks" for the manager's shared
1194 # objects. Note the comment in _TestPool.test_terminate().
1195 ALLOWED_TYPES = ('manager',)
1196
1197 def test_number_of_objects(self):
1198 EXPECTED_NUMBER = 1 # the pool object is still alive
1199 multiprocessing.active_children() # discard dead process objs
1200 gc.collect() # do garbage collection
1201 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001202 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001203 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001204 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001205 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001206
1207 self.assertEqual(refs, EXPECTED_NUMBER)
1208
1209#
1210# Test of creating a customized manager class
1211#
1212
1213from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1214
1215class FooBar(object):
1216 def f(self):
1217 return 'f()'
1218 def g(self):
1219 raise ValueError
1220 def _h(self):
1221 return '_h()'
1222
1223def baz():
1224 for i in range(10):
1225 yield i*i
1226
1227class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001228 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001229 def __iter__(self):
1230 return self
1231 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001232 return self._callmethod('__next__')
1233
1234class MyManager(BaseManager):
1235 pass
1236
1237MyManager.register('Foo', callable=FooBar)
1238MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1239MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1240
1241
1242class _TestMyManager(BaseTestCase):
1243
1244 ALLOWED_TYPES = ('manager',)
1245
1246 def test_mymanager(self):
1247 manager = MyManager()
1248 manager.start()
1249
1250 foo = manager.Foo()
1251 bar = manager.Bar()
1252 baz = manager.baz()
1253
1254 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1255 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1256
1257 self.assertEqual(foo_methods, ['f', 'g'])
1258 self.assertEqual(bar_methods, ['f', '_h'])
1259
1260 self.assertEqual(foo.f(), 'f()')
1261 self.assertRaises(ValueError, foo.g)
1262 self.assertEqual(foo._callmethod('f'), 'f()')
1263 self.assertRaises(RemoteError, foo._callmethod, '_h')
1264
1265 self.assertEqual(bar.f(), 'f()')
1266 self.assertEqual(bar._h(), '_h()')
1267 self.assertEqual(bar._callmethod('f'), 'f()')
1268 self.assertEqual(bar._callmethod('_h'), '_h()')
1269
1270 self.assertEqual(list(baz), [i*i for i in range(10)])
1271
1272 manager.shutdown()
1273
1274#
1275# Test of connecting to a remote server and using xmlrpclib for serialization
1276#
1277
1278_queue = pyqueue.Queue()
1279def get_queue():
1280 return _queue
1281
1282class QueueManager(BaseManager):
1283 '''manager class used by server process'''
1284QueueManager.register('get_queue', callable=get_queue)
1285
1286class QueueManager2(BaseManager):
1287 '''manager class which specifies the same interface as QueueManager'''
1288QueueManager2.register('get_queue')
1289
1290
1291SERIALIZER = 'xmlrpclib'
1292
1293class _TestRemoteManager(BaseTestCase):
1294
1295 ALLOWED_TYPES = ('manager',)
1296
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001297 @classmethod
1298 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001299 manager = QueueManager2(
1300 address=address, authkey=authkey, serializer=SERIALIZER
1301 )
1302 manager.connect()
1303 queue = manager.get_queue()
1304 queue.put(('hello world', None, True, 2.25))
1305
1306 def test_remote(self):
1307 authkey = os.urandom(32)
1308
1309 manager = QueueManager(
1310 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1311 )
1312 manager.start()
1313
1314 p = self.Process(target=self._putter, args=(manager.address, authkey))
1315 p.start()
1316
1317 manager2 = QueueManager2(
1318 address=manager.address, authkey=authkey, serializer=SERIALIZER
1319 )
1320 manager2.connect()
1321 queue = manager2.get_queue()
1322
1323 # Note that xmlrpclib will deserialize object as a list not a tuple
1324 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1325
1326 # Because we are using xmlrpclib for serialization instead of
1327 # pickle this will cause a serialization error.
1328 self.assertRaises(Exception, queue.put, time.sleep)
1329
1330 # Make queue finalizer run before the server is stopped
1331 del queue
1332 manager.shutdown()
1333
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001334class _TestManagerRestart(BaseTestCase):
1335
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001336 @classmethod
1337 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001338 manager = QueueManager(
1339 address=address, authkey=authkey, serializer=SERIALIZER)
1340 manager.connect()
1341 queue = manager.get_queue()
1342 queue.put('hello world')
1343
1344 def test_rapid_restart(self):
1345 authkey = os.urandom(32)
1346 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001347 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001348 srvr = manager.get_server()
1349 addr = srvr.address
1350 # Close the connection.Listener socket which gets opened as a part
1351 # of manager.get_server(). It's not needed for the test.
1352 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001353 manager.start()
1354
1355 p = self.Process(target=self._putter, args=(manager.address, authkey))
1356 p.start()
1357 queue = manager.get_queue()
1358 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001359 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001360 manager.shutdown()
1361 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001362 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001363 try:
1364 manager.start()
1365 except IOError as e:
1366 if e.errno != errno.EADDRINUSE:
1367 raise
1368 # Retry after some time, in case the old socket was lingering
1369 # (sporadic failure on buildbots)
1370 time.sleep(1.0)
1371 manager = QueueManager(
1372 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001373 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001374
Benjamin Petersone711caf2008-06-11 16:44:04 +00001375#
1376#
1377#
1378
1379SENTINEL = latin('')
1380
1381class _TestConnection(BaseTestCase):
1382
1383 ALLOWED_TYPES = ('processes', 'threads')
1384
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001385 @classmethod
1386 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001387 for msg in iter(conn.recv_bytes, SENTINEL):
1388 conn.send_bytes(msg)
1389 conn.close()
1390
1391 def test_connection(self):
1392 conn, child_conn = self.Pipe()
1393
1394 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001395 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001396 p.start()
1397
1398 seq = [1, 2.25, None]
1399 msg = latin('hello world')
1400 longmsg = msg * 10
1401 arr = array.array('i', list(range(4)))
1402
1403 if self.TYPE == 'processes':
1404 self.assertEqual(type(conn.fileno()), int)
1405
1406 self.assertEqual(conn.send(seq), None)
1407 self.assertEqual(conn.recv(), seq)
1408
1409 self.assertEqual(conn.send_bytes(msg), None)
1410 self.assertEqual(conn.recv_bytes(), msg)
1411
1412 if self.TYPE == 'processes':
1413 buffer = array.array('i', [0]*10)
1414 expected = list(arr) + [0] * (10 - len(arr))
1415 self.assertEqual(conn.send_bytes(arr), None)
1416 self.assertEqual(conn.recv_bytes_into(buffer),
1417 len(arr) * buffer.itemsize)
1418 self.assertEqual(list(buffer), expected)
1419
1420 buffer = array.array('i', [0]*10)
1421 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1422 self.assertEqual(conn.send_bytes(arr), None)
1423 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1424 len(arr) * buffer.itemsize)
1425 self.assertEqual(list(buffer), expected)
1426
1427 buffer = bytearray(latin(' ' * 40))
1428 self.assertEqual(conn.send_bytes(longmsg), None)
1429 try:
1430 res = conn.recv_bytes_into(buffer)
1431 except multiprocessing.BufferTooShort as e:
1432 self.assertEqual(e.args, (longmsg,))
1433 else:
1434 self.fail('expected BufferTooShort, got %s' % res)
1435
1436 poll = TimingWrapper(conn.poll)
1437
1438 self.assertEqual(poll(), False)
1439 self.assertTimingAlmostEqual(poll.elapsed, 0)
1440
1441 self.assertEqual(poll(TIMEOUT1), False)
1442 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1443
1444 conn.send(None)
1445
1446 self.assertEqual(poll(TIMEOUT1), True)
1447 self.assertTimingAlmostEqual(poll.elapsed, 0)
1448
1449 self.assertEqual(conn.recv(), None)
1450
1451 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1452 conn.send_bytes(really_big_msg)
1453 self.assertEqual(conn.recv_bytes(), really_big_msg)
1454
1455 conn.send_bytes(SENTINEL) # tell child to quit
1456 child_conn.close()
1457
1458 if self.TYPE == 'processes':
1459 self.assertEqual(conn.readable, True)
1460 self.assertEqual(conn.writable, True)
1461 self.assertRaises(EOFError, conn.recv)
1462 self.assertRaises(EOFError, conn.recv_bytes)
1463
1464 p.join()
1465
1466 def test_duplex_false(self):
1467 reader, writer = self.Pipe(duplex=False)
1468 self.assertEqual(writer.send(1), None)
1469 self.assertEqual(reader.recv(), 1)
1470 if self.TYPE == 'processes':
1471 self.assertEqual(reader.readable, True)
1472 self.assertEqual(reader.writable, False)
1473 self.assertEqual(writer.readable, False)
1474 self.assertEqual(writer.writable, True)
1475 self.assertRaises(IOError, reader.send, 2)
1476 self.assertRaises(IOError, writer.recv)
1477 self.assertRaises(IOError, writer.poll)
1478
1479 def test_spawn_close(self):
1480 # We test that a pipe connection can be closed by parent
1481 # process immediately after child is spawned. On Windows this
1482 # would have sometimes failed on old versions because
1483 # child_conn would be closed before the child got a chance to
1484 # duplicate it.
1485 conn, child_conn = self.Pipe()
1486
1487 p = self.Process(target=self._echo, args=(child_conn,))
1488 p.start()
1489 child_conn.close() # this might complete before child initializes
1490
1491 msg = latin('hello')
1492 conn.send_bytes(msg)
1493 self.assertEqual(conn.recv_bytes(), msg)
1494
1495 conn.send_bytes(SENTINEL)
1496 conn.close()
1497 p.join()
1498
1499 def test_sendbytes(self):
1500 if self.TYPE != 'processes':
1501 return
1502
1503 msg = latin('abcdefghijklmnopqrstuvwxyz')
1504 a, b = self.Pipe()
1505
1506 a.send_bytes(msg)
1507 self.assertEqual(b.recv_bytes(), msg)
1508
1509 a.send_bytes(msg, 5)
1510 self.assertEqual(b.recv_bytes(), msg[5:])
1511
1512 a.send_bytes(msg, 7, 8)
1513 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1514
1515 a.send_bytes(msg, 26)
1516 self.assertEqual(b.recv_bytes(), latin(''))
1517
1518 a.send_bytes(msg, 26, 0)
1519 self.assertEqual(b.recv_bytes(), latin(''))
1520
1521 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1522
1523 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1524
1525 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1526
1527 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1528
1529 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1530
Benjamin Petersone711caf2008-06-11 16:44:04 +00001531class _TestListenerClient(BaseTestCase):
1532
1533 ALLOWED_TYPES = ('processes', 'threads')
1534
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001535 @classmethod
1536 def _test(cls, address):
1537 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001538 conn.send('hello')
1539 conn.close()
1540
1541 def test_listener_client(self):
1542 for family in self.connection.families:
1543 l = self.connection.Listener(family=family)
1544 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001545 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001546 p.start()
1547 conn = l.accept()
1548 self.assertEqual(conn.recv(), 'hello')
1549 p.join()
1550 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001551#
1552# Test of sending connection and socket objects between processes
1553#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001554"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001555class _TestPicklingConnections(BaseTestCase):
1556
1557 ALLOWED_TYPES = ('processes',)
1558
1559 def _listener(self, conn, families):
1560 for fam in families:
1561 l = self.connection.Listener(family=fam)
1562 conn.send(l.address)
1563 new_conn = l.accept()
1564 conn.send(new_conn)
1565
1566 if self.TYPE == 'processes':
1567 l = socket.socket()
1568 l.bind(('localhost', 0))
1569 conn.send(l.getsockname())
1570 l.listen(1)
1571 new_conn, addr = l.accept()
1572 conn.send(new_conn)
1573
1574 conn.recv()
1575
1576 def _remote(self, conn):
1577 for (address, msg) in iter(conn.recv, None):
1578 client = self.connection.Client(address)
1579 client.send(msg.upper())
1580 client.close()
1581
1582 if self.TYPE == 'processes':
1583 address, msg = conn.recv()
1584 client = socket.socket()
1585 client.connect(address)
1586 client.sendall(msg.upper())
1587 client.close()
1588
1589 conn.close()
1590
1591 def test_pickling(self):
1592 try:
1593 multiprocessing.allow_connection_pickling()
1594 except ImportError:
1595 return
1596
1597 families = self.connection.families
1598
1599 lconn, lconn0 = self.Pipe()
1600 lp = self.Process(target=self._listener, args=(lconn0, families))
1601 lp.start()
1602 lconn0.close()
1603
1604 rconn, rconn0 = self.Pipe()
1605 rp = self.Process(target=self._remote, args=(rconn0,))
1606 rp.start()
1607 rconn0.close()
1608
1609 for fam in families:
1610 msg = ('This connection uses family %s' % fam).encode('ascii')
1611 address = lconn.recv()
1612 rconn.send((address, msg))
1613 new_conn = lconn.recv()
1614 self.assertEqual(new_conn.recv(), msg.upper())
1615
1616 rconn.send(None)
1617
1618 if self.TYPE == 'processes':
1619 msg = latin('This connection uses a normal socket')
1620 address = lconn.recv()
1621 rconn.send((address, msg))
1622 if hasattr(socket, 'fromfd'):
1623 new_conn = lconn.recv()
1624 self.assertEqual(new_conn.recv(100), msg.upper())
1625 else:
1626 # XXX On Windows with Py2.6 need to backport fromfd()
1627 discard = lconn.recv_bytes()
1628
1629 lconn.send(None)
1630
1631 rconn.close()
1632 lconn.close()
1633
1634 lp.join()
1635 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001636"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001637#
1638#
1639#
1640
1641class _TestHeap(BaseTestCase):
1642
1643 ALLOWED_TYPES = ('processes',)
1644
1645 def test_heap(self):
1646 iterations = 5000
1647 maxblocks = 50
1648 blocks = []
1649
1650 # create and destroy lots of blocks of different sizes
1651 for i in range(iterations):
1652 size = int(random.lognormvariate(0, 1) * 1000)
1653 b = multiprocessing.heap.BufferWrapper(size)
1654 blocks.append(b)
1655 if len(blocks) > maxblocks:
1656 i = random.randrange(maxblocks)
1657 del blocks[i]
1658
1659 # get the heap object
1660 heap = multiprocessing.heap.BufferWrapper._heap
1661
1662 # verify the state of the heap
1663 all = []
1664 occupied = 0
1665 for L in list(heap._len_to_seq.values()):
1666 for arena, start, stop in L:
1667 all.append((heap._arenas.index(arena), start, stop,
1668 stop-start, 'free'))
1669 for arena, start, stop in heap._allocated_blocks:
1670 all.append((heap._arenas.index(arena), start, stop,
1671 stop-start, 'occupied'))
1672 occupied += (stop-start)
1673
1674 all.sort()
1675
1676 for i in range(len(all)-1):
1677 (arena, start, stop) = all[i][:3]
1678 (narena, nstart, nstop) = all[i+1][:3]
1679 self.assertTrue((arena != narena and nstart == 0) or
1680 (stop == nstart))
1681
1682#
1683#
1684#
1685
Benjamin Petersone711caf2008-06-11 16:44:04 +00001686class _Foo(Structure):
1687 _fields_ = [
1688 ('x', c_int),
1689 ('y', c_double)
1690 ]
1691
1692class _TestSharedCTypes(BaseTestCase):
1693
1694 ALLOWED_TYPES = ('processes',)
1695
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001696 def setUp(self):
1697 if not HAS_SHAREDCTYPES:
1698 self.skipTest("requires multiprocessing.sharedctypes")
1699
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001700 @classmethod
1701 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001702 x.value *= 2
1703 y.value *= 2
1704 foo.x *= 2
1705 foo.y *= 2
1706 string.value *= 2
1707 for i in range(len(arr)):
1708 arr[i] *= 2
1709
1710 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001711 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001712 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001713 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001714 arr = self.Array('d', list(range(10)), lock=lock)
1715 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001716 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001717
1718 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1719 p.start()
1720 p.join()
1721
1722 self.assertEqual(x.value, 14)
1723 self.assertAlmostEqual(y.value, 2.0/3.0)
1724 self.assertEqual(foo.x, 6)
1725 self.assertAlmostEqual(foo.y, 4.0)
1726 for i in range(10):
1727 self.assertAlmostEqual(arr[i], i*2)
1728 self.assertEqual(string.value, latin('hellohello'))
1729
1730 def test_synchronize(self):
1731 self.test_sharedctypes(lock=True)
1732
1733 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001734 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001735 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001736 foo.x = 0
1737 foo.y = 0
1738 self.assertEqual(bar.x, 2)
1739 self.assertAlmostEqual(bar.y, 5.0)
1740
1741#
1742#
1743#
1744
1745class _TestFinalize(BaseTestCase):
1746
1747 ALLOWED_TYPES = ('processes',)
1748
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001749 @classmethod
1750 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001751 class Foo(object):
1752 pass
1753
1754 a = Foo()
1755 util.Finalize(a, conn.send, args=('a',))
1756 del a # triggers callback for a
1757
1758 b = Foo()
1759 close_b = util.Finalize(b, conn.send, args=('b',))
1760 close_b() # triggers callback for b
1761 close_b() # does nothing because callback has already been called
1762 del b # does nothing because callback has already been called
1763
1764 c = Foo()
1765 util.Finalize(c, conn.send, args=('c',))
1766
1767 d10 = Foo()
1768 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1769
1770 d01 = Foo()
1771 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1772 d02 = Foo()
1773 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1774 d03 = Foo()
1775 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1776
1777 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1778
1779 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1780
Ezio Melotti13925002011-03-16 11:05:33 +02001781 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001782 # garbage collecting locals
1783 util._exit_function()
1784 conn.close()
1785 os._exit(0)
1786
1787 def test_finalize(self):
1788 conn, child_conn = self.Pipe()
1789
1790 p = self.Process(target=self._test_finalize, args=(child_conn,))
1791 p.start()
1792 p.join()
1793
1794 result = [obj for obj in iter(conn.recv, 'STOP')]
1795 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1796
1797#
1798# Test that from ... import * works for each module
1799#
1800
1801class _TestImportStar(BaseTestCase):
1802
1803 ALLOWED_TYPES = ('processes',)
1804
1805 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001806 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001807 'multiprocessing', 'multiprocessing.connection',
1808 'multiprocessing.heap', 'multiprocessing.managers',
1809 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001810 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001811 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001812 ]
1813
1814 if c_int is not None:
1815 # This module requires _ctypes
1816 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001817
1818 for name in modules:
1819 __import__(name)
1820 mod = sys.modules[name]
1821
1822 for attr in getattr(mod, '__all__', ()):
1823 self.assertTrue(
1824 hasattr(mod, attr),
1825 '%r does not have attribute %r' % (mod, attr)
1826 )
1827
1828#
1829# Quick test that logging works -- does not test logging output
1830#
1831
1832class _TestLogging(BaseTestCase):
1833
1834 ALLOWED_TYPES = ('processes',)
1835
1836 def test_enable_logging(self):
1837 logger = multiprocessing.get_logger()
1838 logger.setLevel(util.SUBWARNING)
1839 self.assertTrue(logger is not None)
1840 logger.debug('this will not be printed')
1841 logger.info('nor will this')
1842 logger.setLevel(LOG_LEVEL)
1843
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001844 @classmethod
1845 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001846 logger = multiprocessing.get_logger()
1847 conn.send(logger.getEffectiveLevel())
1848
1849 def test_level(self):
1850 LEVEL1 = 32
1851 LEVEL2 = 37
1852
1853 logger = multiprocessing.get_logger()
1854 root_logger = logging.getLogger()
1855 root_level = root_logger.level
1856
1857 reader, writer = multiprocessing.Pipe(duplex=False)
1858
1859 logger.setLevel(LEVEL1)
1860 self.Process(target=self._test_level, args=(writer,)).start()
1861 self.assertEqual(LEVEL1, reader.recv())
1862
1863 logger.setLevel(logging.NOTSET)
1864 root_logger.setLevel(LEVEL2)
1865 self.Process(target=self._test_level, args=(writer,)).start()
1866 self.assertEqual(LEVEL2, reader.recv())
1867
1868 root_logger.setLevel(root_level)
1869 logger.setLevel(level=LOG_LEVEL)
1870
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001871
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001872# class _TestLoggingProcessName(BaseTestCase):
1873#
1874# def handle(self, record):
1875# assert record.processName == multiprocessing.current_process().name
1876# self.__handled = True
1877#
1878# def test_logging(self):
1879# handler = logging.Handler()
1880# handler.handle = self.handle
1881# self.__handled = False
1882# # Bypass getLogger() and side-effects
1883# logger = logging.getLoggerClass()(
1884# 'multiprocessing.test.TestLoggingProcessName')
1885# logger.addHandler(handler)
1886# logger.propagate = False
1887#
1888# logger.warn('foo')
1889# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001890
Benjamin Petersone711caf2008-06-11 16:44:04 +00001891#
Jesse Noller6214edd2009-01-19 16:23:53 +00001892# Test to verify handle verification, see issue 3321
1893#
1894
1895class TestInvalidHandle(unittest.TestCase):
1896
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001897 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001898 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001899 conn = _multiprocessing.Connection(44977608)
1900 self.assertRaises(IOError, conn.poll)
1901 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001902
Jesse Noller6214edd2009-01-19 16:23:53 +00001903#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001904# Functions used to create test cases from the base ones in this module
1905#
1906
1907def get_attributes(Source, names):
1908 d = {}
1909 for name in names:
1910 obj = getattr(Source, name)
1911 if type(obj) == type(get_attributes):
1912 obj = staticmethod(obj)
1913 d[name] = obj
1914 return d
1915
1916def create_test_cases(Mixin, type):
1917 result = {}
1918 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001919 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001920
1921 for name in list(glob.keys()):
1922 if name.startswith('_Test'):
1923 base = glob[name]
1924 if type in base.ALLOWED_TYPES:
1925 newname = 'With' + Type + name[1:]
1926 class Temp(base, unittest.TestCase, Mixin):
1927 pass
1928 result[newname] = Temp
1929 Temp.__name__ = newname
1930 Temp.__module__ = Mixin.__module__
1931 return result
1932
1933#
1934# Create test cases
1935#
1936
1937class ProcessesMixin(object):
1938 TYPE = 'processes'
1939 Process = multiprocessing.Process
1940 locals().update(get_attributes(multiprocessing, (
1941 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1942 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1943 'RawArray', 'current_process', 'active_children', 'Pipe',
1944 'connection', 'JoinableQueue'
1945 )))
1946
1947testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1948globals().update(testcases_processes)
1949
1950
1951class ManagerMixin(object):
1952 TYPE = 'manager'
1953 Process = multiprocessing.Process
1954 manager = object.__new__(multiprocessing.managers.SyncManager)
1955 locals().update(get_attributes(manager, (
1956 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1957 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1958 'Namespace', 'JoinableQueue'
1959 )))
1960
1961testcases_manager = create_test_cases(ManagerMixin, type='manager')
1962globals().update(testcases_manager)
1963
1964
1965class ThreadsMixin(object):
1966 TYPE = 'threads'
1967 Process = multiprocessing.dummy.Process
1968 locals().update(get_attributes(multiprocessing.dummy, (
1969 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1970 'Condition', 'Event', 'Value', 'Array', 'current_process',
1971 'active_children', 'Pipe', 'connection', 'dict', 'list',
1972 'Namespace', 'JoinableQueue'
1973 )))
1974
1975testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1976globals().update(testcases_threads)
1977
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001978class OtherTest(unittest.TestCase):
1979 # TODO: add more tests for deliver/answer challenge.
1980 def test_deliver_challenge_auth_failure(self):
1981 class _FakeConnection(object):
1982 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001983 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001984 def send_bytes(self, data):
1985 pass
1986 self.assertRaises(multiprocessing.AuthenticationError,
1987 multiprocessing.connection.deliver_challenge,
1988 _FakeConnection(), b'abc')
1989
1990 def test_answer_challenge_auth_failure(self):
1991 class _FakeConnection(object):
1992 def __init__(self):
1993 self.count = 0
1994 def recv_bytes(self, size):
1995 self.count += 1
1996 if self.count == 1:
1997 return multiprocessing.connection.CHALLENGE
1998 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001999 return b'something bogus'
2000 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002001 def send_bytes(self, data):
2002 pass
2003 self.assertRaises(multiprocessing.AuthenticationError,
2004 multiprocessing.connection.answer_challenge,
2005 _FakeConnection(), b'abc')
2006
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002007#
2008# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2009#
2010
2011def initializer(ns):
2012 ns.test += 1
2013
2014class TestInitializers(unittest.TestCase):
2015 def setUp(self):
2016 self.mgr = multiprocessing.Manager()
2017 self.ns = self.mgr.Namespace()
2018 self.ns.test = 0
2019
2020 def tearDown(self):
2021 self.mgr.shutdown()
2022
2023 def test_manager_initializer(self):
2024 m = multiprocessing.managers.SyncManager()
2025 self.assertRaises(TypeError, m.start, 1)
2026 m.start(initializer, (self.ns,))
2027 self.assertEqual(self.ns.test, 1)
2028 m.shutdown()
2029
2030 def test_pool_initializer(self):
2031 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2032 p = multiprocessing.Pool(1, initializer, (self.ns,))
2033 p.close()
2034 p.join()
2035 self.assertEqual(self.ns.test, 1)
2036
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002037#
2038# Issue 5155, 5313, 5331: Test process in processes
2039# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2040#
2041
2042def _ThisSubProcess(q):
2043 try:
2044 item = q.get(block=False)
2045 except pyqueue.Empty:
2046 pass
2047
2048def _TestProcess(q):
2049 queue = multiprocessing.Queue()
2050 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2051 subProc.start()
2052 subProc.join()
2053
2054def _afunc(x):
2055 return x*x
2056
2057def pool_in_process():
2058 pool = multiprocessing.Pool(processes=4)
2059 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2060
2061class _file_like(object):
2062 def __init__(self, delegate):
2063 self._delegate = delegate
2064 self._pid = None
2065
2066 @property
2067 def cache(self):
2068 pid = os.getpid()
2069 # There are no race conditions since fork keeps only the running thread
2070 if pid != self._pid:
2071 self._pid = pid
2072 self._cache = []
2073 return self._cache
2074
2075 def write(self, data):
2076 self.cache.append(data)
2077
2078 def flush(self):
2079 self._delegate.write(''.join(self.cache))
2080 self._cache = []
2081
2082class TestStdinBadfiledescriptor(unittest.TestCase):
2083
2084 def test_queue_in_process(self):
2085 queue = multiprocessing.Queue()
2086 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2087 proc.start()
2088 proc.join()
2089
2090 def test_pool_in_process(self):
2091 p = multiprocessing.Process(target=pool_in_process)
2092 p.start()
2093 p.join()
2094
2095 def test_flushing(self):
2096 sio = io.StringIO()
2097 flike = _file_like(sio)
2098 flike.write('foo')
2099 proc = multiprocessing.Process(target=lambda: flike.flush())
2100 flike.flush()
2101 assert sio.getvalue() == 'foo'
2102
2103testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2104 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002105
Benjamin Petersone711caf2008-06-11 16:44:04 +00002106#
2107#
2108#
2109
2110def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002111 if sys.platform.startswith("linux"):
2112 try:
2113 lock = multiprocessing.RLock()
2114 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002115 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002116
Benjamin Petersone711caf2008-06-11 16:44:04 +00002117 if run is None:
2118 from test.support import run_unittest as run
2119
2120 util.get_temp_dir() # creates temp directory for use by all processes
2121
2122 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2123
Benjamin Peterson41181742008-07-02 20:22:54 +00002124 ProcessesMixin.pool = multiprocessing.Pool(4)
2125 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2126 ManagerMixin.manager.__init__()
2127 ManagerMixin.manager.start()
2128 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002129
2130 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002131 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2132 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002133 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2134 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002135 )
2136
2137 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2138 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2139 run(suite)
2140
Benjamin Peterson41181742008-07-02 20:22:54 +00002141 ThreadsMixin.pool.terminate()
2142 ProcessesMixin.pool.terminate()
2143 ManagerMixin.pool.terminate()
2144 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002145
Benjamin Peterson41181742008-07-02 20:22:54 +00002146 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002147
2148def main():
2149 test_main(unittest.TextTestRunner(verbosity=2).run)
2150
2151if __name__ == '__main__':
2152 main()