blob: 4e48944f706b149453e26a61b862163d5fb22616 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000019import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
R. David Murraya21e4ca2009-03-31 23:16:50 +000022# Skip tests if _multiprocessing wasn't built.
23_multiprocessing = test.support.import_module('_multiprocessing')
24# Skip tests if sem_open implementation is broken.
25test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000026# import threading after _multiprocessing to raise a more revelant error
27# message: "No module named _multiprocessing". _multiprocessing is not compiled
28# without thread support.
29import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
Benjamin Petersone711caf2008-06-11 16:44:04 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37from multiprocessing import util
38
Brian Curtinafa88b52010-10-07 01:12:19 +000039try:
40 from multiprocessing.sharedctypes import Value, copy
41 HAS_SHAREDCTYPES = True
42except ImportError:
43 HAS_SHAREDCTYPES = False
44
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46#
47#
48
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000049def latin(s):
50 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Benjamin Petersone711caf2008-06-11 16:44:04 +000052#
53# Constants
54#
55
56LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000057#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59DELTA = 0.1
60CHECK_TIMINGS = False # making true makes tests take a lot longer
61 # and can sometimes cause some non-serious
62 # failures because some calls block a bit
63 # longer than expected
64if CHECK_TIMINGS:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
66else:
67 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
68
69HAVE_GETVALUE = not getattr(_multiprocessing,
70 'HAVE_BROKEN_SEM_GETVALUE', False)
71
Jesse Noller6214edd2009-01-19 16:23:53 +000072WIN32 = (sys.platform == "win32")
73
Benjamin Petersone711caf2008-06-11 16:44:04 +000074#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000075# Some tests require ctypes
76#
77
78try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000079 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000080except ImportError:
81 Structure = object
82 c_int = c_double = None
83
84#
Benjamin Petersone711caf2008-06-11 16:44:04 +000085# Creates a wrapper for a function which records the time it takes to finish
86#
87
88class TimingWrapper(object):
89
90 def __init__(self, func):
91 self.func = func
92 self.elapsed = None
93
94 def __call__(self, *args, **kwds):
95 t = time.time()
96 try:
97 return self.func(*args, **kwds)
98 finally:
99 self.elapsed = time.time() - t
100
101#
102# Base class for test cases
103#
104
105class BaseTestCase(object):
106
107 ALLOWED_TYPES = ('processes', 'manager', 'threads')
108
109 def assertTimingAlmostEqual(self, a, b):
110 if CHECK_TIMINGS:
111 self.assertAlmostEqual(a, b, 1)
112
113 def assertReturnsIfImplemented(self, value, func, *args):
114 try:
115 res = func(*args)
116 except NotImplementedError:
117 pass
118 else:
119 return self.assertEqual(value, res)
120
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000121 # For the sanity of Windows users, rather than crashing or freezing in
122 # multiple ways.
123 def __reduce__(self, *args):
124 raise NotImplementedError("shouldn't try to pickle a test case")
125
126 __reduce_ex__ = __reduce__
127
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128#
129# Return the value of a semaphore
130#
131
132def get_value(self):
133 try:
134 return self.get_value()
135 except AttributeError:
136 try:
137 return self._Semaphore__value
138 except AttributeError:
139 try:
140 return self._value
141 except AttributeError:
142 raise NotImplementedError
143
144#
145# Testcases
146#
147
148class _TestProcess(BaseTestCase):
149
150 ALLOWED_TYPES = ('processes', 'threads')
151
152 def test_current(self):
153 if self.TYPE == 'threads':
154 return
155
156 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158
159 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000161 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 self.assertEqual(current.ident, os.getpid())
164 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000166 @classmethod
167 def _test(cls, q, *args, **kwds):
168 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 q.put(args)
170 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000173 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 q.put(current.pid)
175
176 def test_process(self):
177 q = self.Queue(1)
178 e = self.Event()
179 args = (q, 1, 2)
180 kwargs = {'hello':23, 'bye':2.54}
181 name = 'SomeProcess'
182 p = self.Process(
183 target=self._test, args=args, kwargs=kwargs, name=name
184 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 current = self.current_process()
187
188 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000189 self.assertEqual(p.authkey, current.authkey)
190 self.assertEqual(p.is_alive(), False)
191 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195
196 p.start()
197
Ezio Melottib3aedd42010-11-20 19:04:17 +0000198 self.assertEqual(p.exitcode, None)
199 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000200 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201
Ezio Melottib3aedd42010-11-20 19:04:17 +0000202 self.assertEqual(q.get(), args[1:])
203 self.assertEqual(q.get(), kwargs)
204 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000206 self.assertEqual(q.get(), current.authkey)
207 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
209 p.join()
210
Ezio Melottib3aedd42010-11-20 19:04:17 +0000211 self.assertEqual(p.exitcode, 0)
212 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000213 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000215 @classmethod
216 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217 time.sleep(1000)
218
219 def test_terminate(self):
220 if self.TYPE == 'threads':
221 return
222
223 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 p.start()
226
227 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000228 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230
231 p.terminate()
232
233 join = TimingWrapper(p.join)
234 self.assertEqual(join(), None)
235 self.assertTimingAlmostEqual(join.elapsed, 0.0)
236
237 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000238 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 p.join()
241
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 # XXX sometimes get p.exitcode == 0 on Windows ...
243 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 def test_cpu_count(self):
246 try:
247 cpus = multiprocessing.cpu_count()
248 except NotImplementedError:
249 cpus = 1
250 self.assertTrue(type(cpus) is int)
251 self.assertTrue(cpus >= 1)
252
253 def test_active_children(self):
254 self.assertEqual(type(self.active_children()), list)
255
256 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000257 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000265 @classmethod
266 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267 from multiprocessing import forking
268 wconn.send(id)
269 if len(id) < 2:
270 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 p = cls.Process(
272 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 )
274 p.start()
275 p.join()
276
277 def test_recursion(self):
278 rconn, wconn = self.Pipe(duplex=False)
279 self._test_recursion(wconn, [])
280
281 time.sleep(DELTA)
282 result = []
283 while rconn.poll():
284 result.append(rconn.recv())
285
286 expected = [
287 [],
288 [0],
289 [0, 0],
290 [0, 1],
291 [1],
292 [1, 0],
293 [1, 1]
294 ]
295 self.assertEqual(result, expected)
296
297#
298#
299#
300
301class _UpperCaser(multiprocessing.Process):
302
303 def __init__(self):
304 multiprocessing.Process.__init__(self)
305 self.child_conn, self.parent_conn = multiprocessing.Pipe()
306
307 def run(self):
308 self.parent_conn.close()
309 for s in iter(self.child_conn.recv, None):
310 self.child_conn.send(s.upper())
311 self.child_conn.close()
312
313 def submit(self, s):
314 assert type(s) is str
315 self.parent_conn.send(s)
316 return self.parent_conn.recv()
317
318 def stop(self):
319 self.parent_conn.send(None)
320 self.parent_conn.close()
321 self.child_conn.close()
322
323class _TestSubclassingProcess(BaseTestCase):
324
325 ALLOWED_TYPES = ('processes',)
326
327 def test_subclassing(self):
328 uppercaser = _UpperCaser()
329 uppercaser.start()
330 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
331 self.assertEqual(uppercaser.submit('world'), 'WORLD')
332 uppercaser.stop()
333 uppercaser.join()
334
335#
336#
337#
338
339def queue_empty(q):
340 if hasattr(q, 'empty'):
341 return q.empty()
342 else:
343 return q.qsize() == 0
344
345def queue_full(q, maxsize):
346 if hasattr(q, 'full'):
347 return q.full()
348 else:
349 return q.qsize() == maxsize
350
351
352class _TestQueue(BaseTestCase):
353
354
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000355 @classmethod
356 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000357 child_can_start.wait()
358 for i in range(6):
359 queue.get()
360 parent_can_continue.set()
361
362 def test_put(self):
363 MAXSIZE = 6
364 queue = self.Queue(maxsize=MAXSIZE)
365 child_can_start = self.Event()
366 parent_can_continue = self.Event()
367
368 proc = self.Process(
369 target=self._test_put,
370 args=(queue, child_can_start, parent_can_continue)
371 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000372 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 proc.start()
374
375 self.assertEqual(queue_empty(queue), True)
376 self.assertEqual(queue_full(queue, MAXSIZE), False)
377
378 queue.put(1)
379 queue.put(2, True)
380 queue.put(3, True, None)
381 queue.put(4, False)
382 queue.put(5, False, None)
383 queue.put_nowait(6)
384
385 # the values may be in buffer but not yet in pipe so sleep a bit
386 time.sleep(DELTA)
387
388 self.assertEqual(queue_empty(queue), False)
389 self.assertEqual(queue_full(queue, MAXSIZE), True)
390
391 put = TimingWrapper(queue.put)
392 put_nowait = TimingWrapper(queue.put_nowait)
393
394 self.assertRaises(pyqueue.Full, put, 7, False)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(pyqueue.Full, put, 7, False, None)
398 self.assertTimingAlmostEqual(put.elapsed, 0)
399
400 self.assertRaises(pyqueue.Full, put_nowait, 7)
401 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
402
403 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
404 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
405
406 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
407 self.assertTimingAlmostEqual(put.elapsed, 0)
408
409 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
410 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 self.assertEqual(queue_empty(queue), True)
416 self.assertEqual(queue_full(queue, MAXSIZE), False)
417
418 proc.join()
419
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000420 @classmethod
421 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000423 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000424 queue.put(2)
425 queue.put(3)
426 queue.put(4)
427 queue.put(5)
428 parent_can_continue.set()
429
430 def test_get(self):
431 queue = self.Queue()
432 child_can_start = self.Event()
433 parent_can_continue = self.Event()
434
435 proc = self.Process(
436 target=self._test_get,
437 args=(queue, child_can_start, parent_can_continue)
438 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000439 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440 proc.start()
441
442 self.assertEqual(queue_empty(queue), True)
443
444 child_can_start.set()
445 parent_can_continue.wait()
446
447 time.sleep(DELTA)
448 self.assertEqual(queue_empty(queue), False)
449
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000450 # Hangs unexpectedly, remove for now
451 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000452 self.assertEqual(queue.get(True, None), 2)
453 self.assertEqual(queue.get(True), 3)
454 self.assertEqual(queue.get(timeout=1), 4)
455 self.assertEqual(queue.get_nowait(), 5)
456
457 self.assertEqual(queue_empty(queue), True)
458
459 get = TimingWrapper(queue.get)
460 get_nowait = TimingWrapper(queue.get_nowait)
461
462 self.assertRaises(pyqueue.Empty, get, False)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get, False, None)
466 self.assertTimingAlmostEqual(get.elapsed, 0)
467
468 self.assertRaises(pyqueue.Empty, get_nowait)
469 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
470
471 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
472 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
473
474 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
475 self.assertTimingAlmostEqual(get.elapsed, 0)
476
477 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
478 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
479
480 proc.join()
481
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000482 @classmethod
483 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000484 for i in range(10, 20):
485 queue.put(i)
486 # note that at this point the items may only be buffered, so the
487 # process cannot shutdown until the feeder thread has finished
488 # pushing items onto the pipe.
489
490 def test_fork(self):
491 # Old versions of Queue would fail to create a new feeder
492 # thread for a forked process if the original process had its
493 # own feeder thread. This test checks that this no longer
494 # happens.
495
496 queue = self.Queue()
497
498 # put items on queue so that main process starts a feeder thread
499 for i in range(10):
500 queue.put(i)
501
502 # wait to make sure thread starts before we fork a new process
503 time.sleep(DELTA)
504
505 # fork process
506 p = self.Process(target=self._test_fork, args=(queue,))
507 p.start()
508
509 # check that all expected items are in the queue
510 for i in range(20):
511 self.assertEqual(queue.get(), i)
512 self.assertRaises(pyqueue.Empty, queue.get, False)
513
514 p.join()
515
516 def test_qsize(self):
517 q = self.Queue()
518 try:
519 self.assertEqual(q.qsize(), 0)
520 except NotImplementedError:
521 return
522 q.put(1)
523 self.assertEqual(q.qsize(), 1)
524 q.put(5)
525 self.assertEqual(q.qsize(), 2)
526 q.get()
527 self.assertEqual(q.qsize(), 1)
528 q.get()
529 self.assertEqual(q.qsize(), 0)
530
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000531 @classmethod
532 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000533 for obj in iter(q.get, None):
534 time.sleep(DELTA)
535 q.task_done()
536
537 def test_task_done(self):
538 queue = self.JoinableQueue()
539
540 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000541 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542
543 workers = [self.Process(target=self._test_task_done, args=(queue,))
544 for i in range(4)]
545
546 for p in workers:
547 p.start()
548
549 for i in range(10):
550 queue.put(i)
551
552 queue.join()
553
554 for p in workers:
555 queue.put(None)
556
557 for p in workers:
558 p.join()
559
560#
561#
562#
563
564class _TestLock(BaseTestCase):
565
566 def test_lock(self):
567 lock = self.Lock()
568 self.assertEqual(lock.acquire(), True)
569 self.assertEqual(lock.acquire(False), False)
570 self.assertEqual(lock.release(), None)
571 self.assertRaises((ValueError, threading.ThreadError), lock.release)
572
573 def test_rlock(self):
574 lock = self.RLock()
575 self.assertEqual(lock.acquire(), True)
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.release(), None)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertRaises((AssertionError, RuntimeError), lock.release)
582
Jesse Nollerf8d00852009-03-31 03:25:07 +0000583 def test_lock_context(self):
584 with self.Lock():
585 pass
586
Benjamin Petersone711caf2008-06-11 16:44:04 +0000587
588class _TestSemaphore(BaseTestCase):
589
590 def _test_semaphore(self, sem):
591 self.assertReturnsIfImplemented(2, get_value, sem)
592 self.assertEqual(sem.acquire(), True)
593 self.assertReturnsIfImplemented(1, get_value, sem)
594 self.assertEqual(sem.acquire(), True)
595 self.assertReturnsIfImplemented(0, get_value, sem)
596 self.assertEqual(sem.acquire(False), False)
597 self.assertReturnsIfImplemented(0, get_value, sem)
598 self.assertEqual(sem.release(), None)
599 self.assertReturnsIfImplemented(1, get_value, sem)
600 self.assertEqual(sem.release(), None)
601 self.assertReturnsIfImplemented(2, get_value, sem)
602
603 def test_semaphore(self):
604 sem = self.Semaphore(2)
605 self._test_semaphore(sem)
606 self.assertEqual(sem.release(), None)
607 self.assertReturnsIfImplemented(3, get_value, sem)
608 self.assertEqual(sem.release(), None)
609 self.assertReturnsIfImplemented(4, get_value, sem)
610
611 def test_bounded_semaphore(self):
612 sem = self.BoundedSemaphore(2)
613 self._test_semaphore(sem)
614 # Currently fails on OS/X
615 #if HAVE_GETVALUE:
616 # self.assertRaises(ValueError, sem.release)
617 # self.assertReturnsIfImplemented(2, get_value, sem)
618
619 def test_timeout(self):
620 if self.TYPE != 'processes':
621 return
622
623 sem = self.Semaphore(0)
624 acquire = TimingWrapper(sem.acquire)
625
626 self.assertEqual(acquire(False), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, None), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
631
632 self.assertEqual(acquire(False, TIMEOUT1), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, 0)
634
635 self.assertEqual(acquire(True, TIMEOUT2), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
637
638 self.assertEqual(acquire(timeout=TIMEOUT3), False)
639 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
640
641
642class _TestCondition(BaseTestCase):
643
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000644 @classmethod
645 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000646 cond.acquire()
647 sleeping.release()
648 cond.wait(timeout)
649 woken.release()
650 cond.release()
651
652 def check_invariant(self, cond):
653 # this is only supposed to succeed when there are no sleepers
654 if self.TYPE == 'processes':
655 try:
656 sleepers = (cond._sleeping_count.get_value() -
657 cond._woken_count.get_value())
658 self.assertEqual(sleepers, 0)
659 self.assertEqual(cond._wait_semaphore.get_value(), 0)
660 except NotImplementedError:
661 pass
662
663 def test_notify(self):
664 cond = self.Condition()
665 sleeping = self.Semaphore(0)
666 woken = self.Semaphore(0)
667
668 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000669 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000670 p.start()
671
672 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000673 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 p.start()
675
676 # wait for both children to start sleeping
677 sleeping.acquire()
678 sleeping.acquire()
679
680 # check no process/thread has woken up
681 time.sleep(DELTA)
682 self.assertReturnsIfImplemented(0, get_value, woken)
683
684 # wake up one process/thread
685 cond.acquire()
686 cond.notify()
687 cond.release()
688
689 # check one process/thread has woken up
690 time.sleep(DELTA)
691 self.assertReturnsIfImplemented(1, get_value, woken)
692
693 # wake up another
694 cond.acquire()
695 cond.notify()
696 cond.release()
697
698 # check other has woken up
699 time.sleep(DELTA)
700 self.assertReturnsIfImplemented(2, get_value, woken)
701
702 # check state is not mucked up
703 self.check_invariant(cond)
704 p.join()
705
706 def test_notify_all(self):
707 cond = self.Condition()
708 sleeping = self.Semaphore(0)
709 woken = self.Semaphore(0)
710
711 # start some threads/processes which will timeout
712 for i in range(3):
713 p = self.Process(target=self.f,
714 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000715 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 p.start()
717
718 t = threading.Thread(target=self.f,
719 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000720 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 t.start()
722
723 # wait for them all to sleep
724 for i in range(6):
725 sleeping.acquire()
726
727 # check they have all timed out
728 for i in range(6):
729 woken.acquire()
730 self.assertReturnsIfImplemented(0, get_value, woken)
731
732 # check state is not mucked up
733 self.check_invariant(cond)
734
735 # start some more threads/processes
736 for i in range(3):
737 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000738 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000739 p.start()
740
741 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000742 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000743 t.start()
744
745 # wait for them to all sleep
746 for i in range(6):
747 sleeping.acquire()
748
749 # check no process/thread has woken up
750 time.sleep(DELTA)
751 self.assertReturnsIfImplemented(0, get_value, woken)
752
753 # wake them all up
754 cond.acquire()
755 cond.notify_all()
756 cond.release()
757
758 # check they have all woken
759 time.sleep(DELTA)
760 self.assertReturnsIfImplemented(6, get_value, woken)
761
762 # check state is not mucked up
763 self.check_invariant(cond)
764
765 def test_timeout(self):
766 cond = self.Condition()
767 wait = TimingWrapper(cond.wait)
768 cond.acquire()
769 res = wait(TIMEOUT1)
770 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000771 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000772 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
773
774
775class _TestEvent(BaseTestCase):
776
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000777 @classmethod
778 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 time.sleep(TIMEOUT2)
780 event.set()
781
782 def test_event(self):
783 event = self.Event()
784 wait = TimingWrapper(event.wait)
785
Ezio Melotti13925002011-03-16 11:05:33 +0200786 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789
Benjamin Peterson965ce872009-04-05 21:24:58 +0000790 # Removed, threading.Event.wait() will return the value of the __flag
791 # instead of None. API Shear with the semaphore backed mp.Event
792 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000794 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000795 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
796
797 event.set()
798
799 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(event.is_set(), True)
801 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000803 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
805 # self.assertEqual(event.is_set(), True)
806
807 event.clear()
808
809 #self.assertEqual(event.is_set(), False)
810
811 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000812 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813
814#
815#
816#
817
818class _TestValue(BaseTestCase):
819
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000820 ALLOWED_TYPES = ('processes',)
821
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 codes_values = [
823 ('i', 4343, 24234),
824 ('d', 3.625, -4.25),
825 ('h', -232, 234),
826 ('c', latin('x'), latin('y'))
827 ]
828
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000829 def setUp(self):
830 if not HAS_SHAREDCTYPES:
831 self.skipTest("requires multiprocessing.sharedctypes")
832
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000833 @classmethod
834 def _test(cls, values):
835 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000836 sv.value = cv[2]
837
838
839 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000840 if raw:
841 values = [self.RawValue(code, value)
842 for code, value, _ in self.codes_values]
843 else:
844 values = [self.Value(code, value)
845 for code, value, _ in self.codes_values]
846
847 for sv, cv in zip(values, self.codes_values):
848 self.assertEqual(sv.value, cv[1])
849
850 proc = self.Process(target=self._test, args=(values,))
851 proc.start()
852 proc.join()
853
854 for sv, cv in zip(values, self.codes_values):
855 self.assertEqual(sv.value, cv[2])
856
857 def test_rawvalue(self):
858 self.test_value(raw=True)
859
860 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 val1 = self.Value('i', 5)
862 lock1 = val1.get_lock()
863 obj1 = val1.get_obj()
864
865 val2 = self.Value('i', 5, lock=None)
866 lock2 = val2.get_lock()
867 obj2 = val2.get_obj()
868
869 lock = self.Lock()
870 val3 = self.Value('i', 5, lock=lock)
871 lock3 = val3.get_lock()
872 obj3 = val3.get_obj()
873 self.assertEqual(lock, lock3)
874
Jesse Nollerb0516a62009-01-18 03:11:38 +0000875 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 self.assertFalse(hasattr(arr4, 'get_lock'))
877 self.assertFalse(hasattr(arr4, 'get_obj'))
878
Jesse Nollerb0516a62009-01-18 03:11:38 +0000879 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
880
881 arr5 = self.RawValue('i', 5)
882 self.assertFalse(hasattr(arr5, 'get_lock'))
883 self.assertFalse(hasattr(arr5, 'get_obj'))
884
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885
886class _TestArray(BaseTestCase):
887
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000888 ALLOWED_TYPES = ('processes',)
889
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000890 @classmethod
891 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 for i in range(1, len(seq)):
893 seq[i] += seq[i-1]
894
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000895 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000897 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
898 if raw:
899 arr = self.RawArray('i', seq)
900 else:
901 arr = self.Array('i', seq)
902
903 self.assertEqual(len(arr), len(seq))
904 self.assertEqual(arr[3], seq[3])
905 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
906
907 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
908
909 self.assertEqual(list(arr[:]), seq)
910
911 self.f(seq)
912
913 p = self.Process(target=self.f, args=(arr,))
914 p.start()
915 p.join()
916
917 self.assertEqual(list(arr[:]), seq)
918
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000919 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000920 def test_array_from_size(self):
921 size = 10
922 # Test for zeroing (see issue #11675).
923 # The repetition below strengthens the test by increasing the chances
924 # of previously allocated non-zero memory being used for the new array
925 # on the 2nd and 3rd loops.
926 for _ in range(3):
927 arr = self.Array('i', size)
928 self.assertEqual(len(arr), size)
929 self.assertEqual(list(arr), [0] * size)
930 arr[:] = range(10)
931 self.assertEqual(list(arr), list(range(10)))
932 del arr
933
934 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000935 def test_rawarray(self):
936 self.test_array(raw=True)
937
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000938 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000939 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000940 arr1 = self.Array('i', list(range(10)))
941 lock1 = arr1.get_lock()
942 obj1 = arr1.get_obj()
943
944 arr2 = self.Array('i', list(range(10)), lock=None)
945 lock2 = arr2.get_lock()
946 obj2 = arr2.get_obj()
947
948 lock = self.Lock()
949 arr3 = self.Array('i', list(range(10)), lock=lock)
950 lock3 = arr3.get_lock()
951 obj3 = arr3.get_obj()
952 self.assertEqual(lock, lock3)
953
Jesse Nollerb0516a62009-01-18 03:11:38 +0000954 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000955 self.assertFalse(hasattr(arr4, 'get_lock'))
956 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000957 self.assertRaises(AttributeError,
958 self.Array, 'i', range(10), lock='notalock')
959
960 arr5 = self.RawArray('i', range(10))
961 self.assertFalse(hasattr(arr5, 'get_lock'))
962 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000963
964#
965#
966#
967
968class _TestContainers(BaseTestCase):
969
970 ALLOWED_TYPES = ('manager',)
971
972 def test_list(self):
973 a = self.list(list(range(10)))
974 self.assertEqual(a[:], list(range(10)))
975
976 b = self.list()
977 self.assertEqual(b[:], [])
978
979 b.extend(list(range(5)))
980 self.assertEqual(b[:], list(range(5)))
981
982 self.assertEqual(b[2], 2)
983 self.assertEqual(b[2:10], [2,3,4])
984
985 b *= 2
986 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
987
988 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
989
990 self.assertEqual(a[:], list(range(10)))
991
992 d = [a, b]
993 e = self.list(d)
994 self.assertEqual(
995 e[:],
996 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
997 )
998
999 f = self.list([a])
1000 a.append('hello')
1001 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1002
1003 def test_dict(self):
1004 d = self.dict()
1005 indices = list(range(65, 70))
1006 for i in indices:
1007 d[i] = chr(i)
1008 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1009 self.assertEqual(sorted(d.keys()), indices)
1010 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1011 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1012
1013 def test_namespace(self):
1014 n = self.Namespace()
1015 n.name = 'Bob'
1016 n.job = 'Builder'
1017 n._hidden = 'hidden'
1018 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1019 del n.job
1020 self.assertEqual(str(n), "Namespace(name='Bob')")
1021 self.assertTrue(hasattr(n, 'name'))
1022 self.assertTrue(not hasattr(n, 'job'))
1023
1024#
1025#
1026#
1027
1028def sqr(x, wait=0.0):
1029 time.sleep(wait)
1030 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001031
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032class _TestPool(BaseTestCase):
1033
1034 def test_apply(self):
1035 papply = self.pool.apply
1036 self.assertEqual(papply(sqr, (5,)), sqr(5))
1037 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1038
1039 def test_map(self):
1040 pmap = self.pool.map
1041 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1042 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1043 list(map(sqr, list(range(100)))))
1044
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001045 def test_map_chunksize(self):
1046 try:
1047 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1048 except multiprocessing.TimeoutError:
1049 self.fail("pool.map_async with chunksize stalled on null list")
1050
Benjamin Petersone711caf2008-06-11 16:44:04 +00001051 def test_async(self):
1052 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1053 get = TimingWrapper(res.get)
1054 self.assertEqual(get(), 49)
1055 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1056
1057 def test_async_timeout(self):
1058 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1059 get = TimingWrapper(res.get)
1060 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1061 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1062
1063 def test_imap(self):
1064 it = self.pool.imap(sqr, list(range(10)))
1065 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1066
1067 it = self.pool.imap(sqr, list(range(10)))
1068 for i in range(10):
1069 self.assertEqual(next(it), i*i)
1070 self.assertRaises(StopIteration, it.__next__)
1071
1072 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1073 for i in range(1000):
1074 self.assertEqual(next(it), i*i)
1075 self.assertRaises(StopIteration, it.__next__)
1076
1077 def test_imap_unordered(self):
1078 it = self.pool.imap_unordered(sqr, list(range(1000)))
1079 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1080
1081 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1082 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1083
1084 def test_make_pool(self):
1085 p = multiprocessing.Pool(3)
1086 self.assertEqual(3, len(p._pool))
1087 p.close()
1088 p.join()
1089
1090 def test_terminate(self):
1091 if self.TYPE == 'manager':
1092 # On Unix a forked process increfs each shared object to
1093 # which its parent process held a reference. If the
1094 # forked process gets terminated then there is likely to
1095 # be a reference leak. So to prevent
1096 # _TestZZZNumberOfObjects from failing we skip this test
1097 # when using a manager.
1098 return
1099
1100 result = self.pool.map_async(
1101 time.sleep, [0.1 for i in range(10000)], chunksize=1
1102 )
1103 self.pool.terminate()
1104 join = TimingWrapper(self.pool.join)
1105 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001106 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001107
Ask Solem2afcbf22010-11-09 20:55:52 +00001108def raising():
1109 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001110
Ask Solem2afcbf22010-11-09 20:55:52 +00001111def unpickleable_result():
1112 return lambda: 42
1113
1114class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001115 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001116
1117 def test_async_error_callback(self):
1118 p = multiprocessing.Pool(2)
1119
1120 scratchpad = [None]
1121 def errback(exc):
1122 scratchpad[0] = exc
1123
1124 res = p.apply_async(raising, error_callback=errback)
1125 self.assertRaises(KeyError, res.get)
1126 self.assertTrue(scratchpad[0])
1127 self.assertIsInstance(scratchpad[0], KeyError)
1128
1129 p.close()
1130 p.join()
1131
1132 def test_unpickleable_result(self):
1133 from multiprocessing.pool import MaybeEncodingError
1134 p = multiprocessing.Pool(2)
1135
1136 # Make sure we don't lose pool processes because of encoding errors.
1137 for iteration in range(20):
1138
1139 scratchpad = [None]
1140 def errback(exc):
1141 scratchpad[0] = exc
1142
1143 res = p.apply_async(unpickleable_result, error_callback=errback)
1144 self.assertRaises(MaybeEncodingError, res.get)
1145 wrapped = scratchpad[0]
1146 self.assertTrue(wrapped)
1147 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1148 self.assertIsNotNone(wrapped.exc)
1149 self.assertIsNotNone(wrapped.value)
1150
1151 p.close()
1152 p.join()
1153
1154class _TestPoolWorkerLifetime(BaseTestCase):
1155 ALLOWED_TYPES = ('processes', )
1156
Jesse Noller1f0b6582010-01-27 03:36:01 +00001157 def test_pool_worker_lifetime(self):
1158 p = multiprocessing.Pool(3, maxtasksperchild=10)
1159 self.assertEqual(3, len(p._pool))
1160 origworkerpids = [w.pid for w in p._pool]
1161 # Run many tasks so each worker gets replaced (hopefully)
1162 results = []
1163 for i in range(100):
1164 results.append(p.apply_async(sqr, (i, )))
1165 # Fetch the results and verify we got the right answers,
1166 # also ensuring all the tasks have completed.
1167 for (j, res) in enumerate(results):
1168 self.assertEqual(res.get(), sqr(j))
1169 # Refill the pool
1170 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001171 # Wait until all workers are alive
1172 countdown = 5
1173 while countdown and not all(w.is_alive() for w in p._pool):
1174 countdown -= 1
1175 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001176 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001177 # All pids should be assigned. See issue #7805.
1178 self.assertNotIn(None, origworkerpids)
1179 self.assertNotIn(None, finalworkerpids)
1180 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001181 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1182 p.close()
1183 p.join()
1184
Benjamin Petersone711caf2008-06-11 16:44:04 +00001185#
1186# Test that manager has expected number of shared objects left
1187#
1188
1189class _TestZZZNumberOfObjects(BaseTestCase):
1190 # Because test cases are sorted alphabetically, this one will get
1191 # run after all the other tests for the manager. It tests that
1192 # there have been no "reference leaks" for the manager's shared
1193 # objects. Note the comment in _TestPool.test_terminate().
1194 ALLOWED_TYPES = ('manager',)
1195
1196 def test_number_of_objects(self):
1197 EXPECTED_NUMBER = 1 # the pool object is still alive
1198 multiprocessing.active_children() # discard dead process objs
1199 gc.collect() # do garbage collection
1200 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001201 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001202 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001203 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001204 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001205
1206 self.assertEqual(refs, EXPECTED_NUMBER)
1207
1208#
1209# Test of creating a customized manager class
1210#
1211
1212from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1213
1214class FooBar(object):
1215 def f(self):
1216 return 'f()'
1217 def g(self):
1218 raise ValueError
1219 def _h(self):
1220 return '_h()'
1221
1222def baz():
1223 for i in range(10):
1224 yield i*i
1225
1226class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001227 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001228 def __iter__(self):
1229 return self
1230 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001231 return self._callmethod('__next__')
1232
1233class MyManager(BaseManager):
1234 pass
1235
1236MyManager.register('Foo', callable=FooBar)
1237MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1238MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1239
1240
1241class _TestMyManager(BaseTestCase):
1242
1243 ALLOWED_TYPES = ('manager',)
1244
1245 def test_mymanager(self):
1246 manager = MyManager()
1247 manager.start()
1248
1249 foo = manager.Foo()
1250 bar = manager.Bar()
1251 baz = manager.baz()
1252
1253 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1254 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1255
1256 self.assertEqual(foo_methods, ['f', 'g'])
1257 self.assertEqual(bar_methods, ['f', '_h'])
1258
1259 self.assertEqual(foo.f(), 'f()')
1260 self.assertRaises(ValueError, foo.g)
1261 self.assertEqual(foo._callmethod('f'), 'f()')
1262 self.assertRaises(RemoteError, foo._callmethod, '_h')
1263
1264 self.assertEqual(bar.f(), 'f()')
1265 self.assertEqual(bar._h(), '_h()')
1266 self.assertEqual(bar._callmethod('f'), 'f()')
1267 self.assertEqual(bar._callmethod('_h'), '_h()')
1268
1269 self.assertEqual(list(baz), [i*i for i in range(10)])
1270
1271 manager.shutdown()
1272
1273#
1274# Test of connecting to a remote server and using xmlrpclib for serialization
1275#
1276
1277_queue = pyqueue.Queue()
1278def get_queue():
1279 return _queue
1280
1281class QueueManager(BaseManager):
1282 '''manager class used by server process'''
1283QueueManager.register('get_queue', callable=get_queue)
1284
1285class QueueManager2(BaseManager):
1286 '''manager class which specifies the same interface as QueueManager'''
1287QueueManager2.register('get_queue')
1288
1289
1290SERIALIZER = 'xmlrpclib'
1291
1292class _TestRemoteManager(BaseTestCase):
1293
1294 ALLOWED_TYPES = ('manager',)
1295
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001296 @classmethod
1297 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001298 manager = QueueManager2(
1299 address=address, authkey=authkey, serializer=SERIALIZER
1300 )
1301 manager.connect()
1302 queue = manager.get_queue()
1303 queue.put(('hello world', None, True, 2.25))
1304
1305 def test_remote(self):
1306 authkey = os.urandom(32)
1307
1308 manager = QueueManager(
1309 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1310 )
1311 manager.start()
1312
1313 p = self.Process(target=self._putter, args=(manager.address, authkey))
1314 p.start()
1315
1316 manager2 = QueueManager2(
1317 address=manager.address, authkey=authkey, serializer=SERIALIZER
1318 )
1319 manager2.connect()
1320 queue = manager2.get_queue()
1321
1322 # Note that xmlrpclib will deserialize object as a list not a tuple
1323 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1324
1325 # Because we are using xmlrpclib for serialization instead of
1326 # pickle this will cause a serialization error.
1327 self.assertRaises(Exception, queue.put, time.sleep)
1328
1329 # Make queue finalizer run before the server is stopped
1330 del queue
1331 manager.shutdown()
1332
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001333class _TestManagerRestart(BaseTestCase):
1334
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001335 @classmethod
1336 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001337 manager = QueueManager(
1338 address=address, authkey=authkey, serializer=SERIALIZER)
1339 manager.connect()
1340 queue = manager.get_queue()
1341 queue.put('hello world')
1342
1343 def test_rapid_restart(self):
1344 authkey = os.urandom(32)
1345 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001346 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001347 srvr = manager.get_server()
1348 addr = srvr.address
1349 # Close the connection.Listener socket which gets opened as a part
1350 # of manager.get_server(). It's not needed for the test.
1351 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001352 manager.start()
1353
1354 p = self.Process(target=self._putter, args=(manager.address, authkey))
1355 p.start()
1356 queue = manager.get_queue()
1357 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001358 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001359 manager.shutdown()
1360 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001361 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001362 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001363 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001364
Benjamin Petersone711caf2008-06-11 16:44:04 +00001365#
1366#
1367#
1368
1369SENTINEL = latin('')
1370
1371class _TestConnection(BaseTestCase):
1372
1373 ALLOWED_TYPES = ('processes', 'threads')
1374
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001375 @classmethod
1376 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001377 for msg in iter(conn.recv_bytes, SENTINEL):
1378 conn.send_bytes(msg)
1379 conn.close()
1380
1381 def test_connection(self):
1382 conn, child_conn = self.Pipe()
1383
1384 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001385 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001386 p.start()
1387
1388 seq = [1, 2.25, None]
1389 msg = latin('hello world')
1390 longmsg = msg * 10
1391 arr = array.array('i', list(range(4)))
1392
1393 if self.TYPE == 'processes':
1394 self.assertEqual(type(conn.fileno()), int)
1395
1396 self.assertEqual(conn.send(seq), None)
1397 self.assertEqual(conn.recv(), seq)
1398
1399 self.assertEqual(conn.send_bytes(msg), None)
1400 self.assertEqual(conn.recv_bytes(), msg)
1401
1402 if self.TYPE == 'processes':
1403 buffer = array.array('i', [0]*10)
1404 expected = list(arr) + [0] * (10 - len(arr))
1405 self.assertEqual(conn.send_bytes(arr), None)
1406 self.assertEqual(conn.recv_bytes_into(buffer),
1407 len(arr) * buffer.itemsize)
1408 self.assertEqual(list(buffer), expected)
1409
1410 buffer = array.array('i', [0]*10)
1411 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1412 self.assertEqual(conn.send_bytes(arr), None)
1413 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1414 len(arr) * buffer.itemsize)
1415 self.assertEqual(list(buffer), expected)
1416
1417 buffer = bytearray(latin(' ' * 40))
1418 self.assertEqual(conn.send_bytes(longmsg), None)
1419 try:
1420 res = conn.recv_bytes_into(buffer)
1421 except multiprocessing.BufferTooShort as e:
1422 self.assertEqual(e.args, (longmsg,))
1423 else:
1424 self.fail('expected BufferTooShort, got %s' % res)
1425
1426 poll = TimingWrapper(conn.poll)
1427
1428 self.assertEqual(poll(), False)
1429 self.assertTimingAlmostEqual(poll.elapsed, 0)
1430
1431 self.assertEqual(poll(TIMEOUT1), False)
1432 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1433
1434 conn.send(None)
1435
1436 self.assertEqual(poll(TIMEOUT1), True)
1437 self.assertTimingAlmostEqual(poll.elapsed, 0)
1438
1439 self.assertEqual(conn.recv(), None)
1440
1441 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1442 conn.send_bytes(really_big_msg)
1443 self.assertEqual(conn.recv_bytes(), really_big_msg)
1444
1445 conn.send_bytes(SENTINEL) # tell child to quit
1446 child_conn.close()
1447
1448 if self.TYPE == 'processes':
1449 self.assertEqual(conn.readable, True)
1450 self.assertEqual(conn.writable, True)
1451 self.assertRaises(EOFError, conn.recv)
1452 self.assertRaises(EOFError, conn.recv_bytes)
1453
1454 p.join()
1455
1456 def test_duplex_false(self):
1457 reader, writer = self.Pipe(duplex=False)
1458 self.assertEqual(writer.send(1), None)
1459 self.assertEqual(reader.recv(), 1)
1460 if self.TYPE == 'processes':
1461 self.assertEqual(reader.readable, True)
1462 self.assertEqual(reader.writable, False)
1463 self.assertEqual(writer.readable, False)
1464 self.assertEqual(writer.writable, True)
1465 self.assertRaises(IOError, reader.send, 2)
1466 self.assertRaises(IOError, writer.recv)
1467 self.assertRaises(IOError, writer.poll)
1468
1469 def test_spawn_close(self):
1470 # We test that a pipe connection can be closed by parent
1471 # process immediately after child is spawned. On Windows this
1472 # would have sometimes failed on old versions because
1473 # child_conn would be closed before the child got a chance to
1474 # duplicate it.
1475 conn, child_conn = self.Pipe()
1476
1477 p = self.Process(target=self._echo, args=(child_conn,))
1478 p.start()
1479 child_conn.close() # this might complete before child initializes
1480
1481 msg = latin('hello')
1482 conn.send_bytes(msg)
1483 self.assertEqual(conn.recv_bytes(), msg)
1484
1485 conn.send_bytes(SENTINEL)
1486 conn.close()
1487 p.join()
1488
1489 def test_sendbytes(self):
1490 if self.TYPE != 'processes':
1491 return
1492
1493 msg = latin('abcdefghijklmnopqrstuvwxyz')
1494 a, b = self.Pipe()
1495
1496 a.send_bytes(msg)
1497 self.assertEqual(b.recv_bytes(), msg)
1498
1499 a.send_bytes(msg, 5)
1500 self.assertEqual(b.recv_bytes(), msg[5:])
1501
1502 a.send_bytes(msg, 7, 8)
1503 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1504
1505 a.send_bytes(msg, 26)
1506 self.assertEqual(b.recv_bytes(), latin(''))
1507
1508 a.send_bytes(msg, 26, 0)
1509 self.assertEqual(b.recv_bytes(), latin(''))
1510
1511 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1512
1513 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1514
1515 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1516
1517 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1518
1519 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1520
Benjamin Petersone711caf2008-06-11 16:44:04 +00001521class _TestListenerClient(BaseTestCase):
1522
1523 ALLOWED_TYPES = ('processes', 'threads')
1524
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001525 @classmethod
1526 def _test(cls, address):
1527 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001528 conn.send('hello')
1529 conn.close()
1530
1531 def test_listener_client(self):
1532 for family in self.connection.families:
1533 l = self.connection.Listener(family=family)
1534 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001535 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001536 p.start()
1537 conn = l.accept()
1538 self.assertEqual(conn.recv(), 'hello')
1539 p.join()
1540 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001541#
1542# Test of sending connection and socket objects between processes
1543#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001544"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001545class _TestPicklingConnections(BaseTestCase):
1546
1547 ALLOWED_TYPES = ('processes',)
1548
1549 def _listener(self, conn, families):
1550 for fam in families:
1551 l = self.connection.Listener(family=fam)
1552 conn.send(l.address)
1553 new_conn = l.accept()
1554 conn.send(new_conn)
1555
1556 if self.TYPE == 'processes':
1557 l = socket.socket()
1558 l.bind(('localhost', 0))
1559 conn.send(l.getsockname())
1560 l.listen(1)
1561 new_conn, addr = l.accept()
1562 conn.send(new_conn)
1563
1564 conn.recv()
1565
1566 def _remote(self, conn):
1567 for (address, msg) in iter(conn.recv, None):
1568 client = self.connection.Client(address)
1569 client.send(msg.upper())
1570 client.close()
1571
1572 if self.TYPE == 'processes':
1573 address, msg = conn.recv()
1574 client = socket.socket()
1575 client.connect(address)
1576 client.sendall(msg.upper())
1577 client.close()
1578
1579 conn.close()
1580
1581 def test_pickling(self):
1582 try:
1583 multiprocessing.allow_connection_pickling()
1584 except ImportError:
1585 return
1586
1587 families = self.connection.families
1588
1589 lconn, lconn0 = self.Pipe()
1590 lp = self.Process(target=self._listener, args=(lconn0, families))
1591 lp.start()
1592 lconn0.close()
1593
1594 rconn, rconn0 = self.Pipe()
1595 rp = self.Process(target=self._remote, args=(rconn0,))
1596 rp.start()
1597 rconn0.close()
1598
1599 for fam in families:
1600 msg = ('This connection uses family %s' % fam).encode('ascii')
1601 address = lconn.recv()
1602 rconn.send((address, msg))
1603 new_conn = lconn.recv()
1604 self.assertEqual(new_conn.recv(), msg.upper())
1605
1606 rconn.send(None)
1607
1608 if self.TYPE == 'processes':
1609 msg = latin('This connection uses a normal socket')
1610 address = lconn.recv()
1611 rconn.send((address, msg))
1612 if hasattr(socket, 'fromfd'):
1613 new_conn = lconn.recv()
1614 self.assertEqual(new_conn.recv(100), msg.upper())
1615 else:
1616 # XXX On Windows with Py2.6 need to backport fromfd()
1617 discard = lconn.recv_bytes()
1618
1619 lconn.send(None)
1620
1621 rconn.close()
1622 lconn.close()
1623
1624 lp.join()
1625 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001626"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001627#
1628#
1629#
1630
1631class _TestHeap(BaseTestCase):
1632
1633 ALLOWED_TYPES = ('processes',)
1634
1635 def test_heap(self):
1636 iterations = 5000
1637 maxblocks = 50
1638 blocks = []
1639
1640 # create and destroy lots of blocks of different sizes
1641 for i in range(iterations):
1642 size = int(random.lognormvariate(0, 1) * 1000)
1643 b = multiprocessing.heap.BufferWrapper(size)
1644 blocks.append(b)
1645 if len(blocks) > maxblocks:
1646 i = random.randrange(maxblocks)
1647 del blocks[i]
1648
1649 # get the heap object
1650 heap = multiprocessing.heap.BufferWrapper._heap
1651
1652 # verify the state of the heap
1653 all = []
1654 occupied = 0
1655 for L in list(heap._len_to_seq.values()):
1656 for arena, start, stop in L:
1657 all.append((heap._arenas.index(arena), start, stop,
1658 stop-start, 'free'))
1659 for arena, start, stop in heap._allocated_blocks:
1660 all.append((heap._arenas.index(arena), start, stop,
1661 stop-start, 'occupied'))
1662 occupied += (stop-start)
1663
1664 all.sort()
1665
1666 for i in range(len(all)-1):
1667 (arena, start, stop) = all[i][:3]
1668 (narena, nstart, nstop) = all[i+1][:3]
1669 self.assertTrue((arena != narena and nstart == 0) or
1670 (stop == nstart))
1671
1672#
1673#
1674#
1675
Benjamin Petersone711caf2008-06-11 16:44:04 +00001676class _Foo(Structure):
1677 _fields_ = [
1678 ('x', c_int),
1679 ('y', c_double)
1680 ]
1681
1682class _TestSharedCTypes(BaseTestCase):
1683
1684 ALLOWED_TYPES = ('processes',)
1685
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001686 def setUp(self):
1687 if not HAS_SHAREDCTYPES:
1688 self.skipTest("requires multiprocessing.sharedctypes")
1689
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001690 @classmethod
1691 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001692 x.value *= 2
1693 y.value *= 2
1694 foo.x *= 2
1695 foo.y *= 2
1696 string.value *= 2
1697 for i in range(len(arr)):
1698 arr[i] *= 2
1699
1700 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001701 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001702 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001703 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001704 arr = self.Array('d', list(range(10)), lock=lock)
1705 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001706 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001707
1708 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1709 p.start()
1710 p.join()
1711
1712 self.assertEqual(x.value, 14)
1713 self.assertAlmostEqual(y.value, 2.0/3.0)
1714 self.assertEqual(foo.x, 6)
1715 self.assertAlmostEqual(foo.y, 4.0)
1716 for i in range(10):
1717 self.assertAlmostEqual(arr[i], i*2)
1718 self.assertEqual(string.value, latin('hellohello'))
1719
1720 def test_synchronize(self):
1721 self.test_sharedctypes(lock=True)
1722
1723 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001724 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001725 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001726 foo.x = 0
1727 foo.y = 0
1728 self.assertEqual(bar.x, 2)
1729 self.assertAlmostEqual(bar.y, 5.0)
1730
1731#
1732#
1733#
1734
1735class _TestFinalize(BaseTestCase):
1736
1737 ALLOWED_TYPES = ('processes',)
1738
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001739 @classmethod
1740 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001741 class Foo(object):
1742 pass
1743
1744 a = Foo()
1745 util.Finalize(a, conn.send, args=('a',))
1746 del a # triggers callback for a
1747
1748 b = Foo()
1749 close_b = util.Finalize(b, conn.send, args=('b',))
1750 close_b() # triggers callback for b
1751 close_b() # does nothing because callback has already been called
1752 del b # does nothing because callback has already been called
1753
1754 c = Foo()
1755 util.Finalize(c, conn.send, args=('c',))
1756
1757 d10 = Foo()
1758 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1759
1760 d01 = Foo()
1761 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1762 d02 = Foo()
1763 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1764 d03 = Foo()
1765 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1766
1767 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1768
1769 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1770
Ezio Melotti13925002011-03-16 11:05:33 +02001771 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001772 # garbage collecting locals
1773 util._exit_function()
1774 conn.close()
1775 os._exit(0)
1776
1777 def test_finalize(self):
1778 conn, child_conn = self.Pipe()
1779
1780 p = self.Process(target=self._test_finalize, args=(child_conn,))
1781 p.start()
1782 p.join()
1783
1784 result = [obj for obj in iter(conn.recv, 'STOP')]
1785 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1786
1787#
1788# Test that from ... import * works for each module
1789#
1790
1791class _TestImportStar(BaseTestCase):
1792
1793 ALLOWED_TYPES = ('processes',)
1794
1795 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001796 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001797 'multiprocessing', 'multiprocessing.connection',
1798 'multiprocessing.heap', 'multiprocessing.managers',
1799 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001800 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001801 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001802 ]
1803
1804 if c_int is not None:
1805 # This module requires _ctypes
1806 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001807
1808 for name in modules:
1809 __import__(name)
1810 mod = sys.modules[name]
1811
1812 for attr in getattr(mod, '__all__', ()):
1813 self.assertTrue(
1814 hasattr(mod, attr),
1815 '%r does not have attribute %r' % (mod, attr)
1816 )
1817
1818#
1819# Quick test that logging works -- does not test logging output
1820#
1821
1822class _TestLogging(BaseTestCase):
1823
1824 ALLOWED_TYPES = ('processes',)
1825
1826 def test_enable_logging(self):
1827 logger = multiprocessing.get_logger()
1828 logger.setLevel(util.SUBWARNING)
1829 self.assertTrue(logger is not None)
1830 logger.debug('this will not be printed')
1831 logger.info('nor will this')
1832 logger.setLevel(LOG_LEVEL)
1833
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001834 @classmethod
1835 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001836 logger = multiprocessing.get_logger()
1837 conn.send(logger.getEffectiveLevel())
1838
1839 def test_level(self):
1840 LEVEL1 = 32
1841 LEVEL2 = 37
1842
1843 logger = multiprocessing.get_logger()
1844 root_logger = logging.getLogger()
1845 root_level = root_logger.level
1846
1847 reader, writer = multiprocessing.Pipe(duplex=False)
1848
1849 logger.setLevel(LEVEL1)
1850 self.Process(target=self._test_level, args=(writer,)).start()
1851 self.assertEqual(LEVEL1, reader.recv())
1852
1853 logger.setLevel(logging.NOTSET)
1854 root_logger.setLevel(LEVEL2)
1855 self.Process(target=self._test_level, args=(writer,)).start()
1856 self.assertEqual(LEVEL2, reader.recv())
1857
1858 root_logger.setLevel(root_level)
1859 logger.setLevel(level=LOG_LEVEL)
1860
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001861
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001862# class _TestLoggingProcessName(BaseTestCase):
1863#
1864# def handle(self, record):
1865# assert record.processName == multiprocessing.current_process().name
1866# self.__handled = True
1867#
1868# def test_logging(self):
1869# handler = logging.Handler()
1870# handler.handle = self.handle
1871# self.__handled = False
1872# # Bypass getLogger() and side-effects
1873# logger = logging.getLoggerClass()(
1874# 'multiprocessing.test.TestLoggingProcessName')
1875# logger.addHandler(handler)
1876# logger.propagate = False
1877#
1878# logger.warn('foo')
1879# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001880
Benjamin Petersone711caf2008-06-11 16:44:04 +00001881#
Jesse Noller6214edd2009-01-19 16:23:53 +00001882# Test to verify handle verification, see issue 3321
1883#
1884
1885class TestInvalidHandle(unittest.TestCase):
1886
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001887 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001888 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001889 conn = _multiprocessing.Connection(44977608)
1890 self.assertRaises(IOError, conn.poll)
1891 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001892
Jesse Noller6214edd2009-01-19 16:23:53 +00001893#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001894# Functions used to create test cases from the base ones in this module
1895#
1896
1897def get_attributes(Source, names):
1898 d = {}
1899 for name in names:
1900 obj = getattr(Source, name)
1901 if type(obj) == type(get_attributes):
1902 obj = staticmethod(obj)
1903 d[name] = obj
1904 return d
1905
1906def create_test_cases(Mixin, type):
1907 result = {}
1908 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001909 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001910
1911 for name in list(glob.keys()):
1912 if name.startswith('_Test'):
1913 base = glob[name]
1914 if type in base.ALLOWED_TYPES:
1915 newname = 'With' + Type + name[1:]
1916 class Temp(base, unittest.TestCase, Mixin):
1917 pass
1918 result[newname] = Temp
1919 Temp.__name__ = newname
1920 Temp.__module__ = Mixin.__module__
1921 return result
1922
1923#
1924# Create test cases
1925#
1926
1927class ProcessesMixin(object):
1928 TYPE = 'processes'
1929 Process = multiprocessing.Process
1930 locals().update(get_attributes(multiprocessing, (
1931 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1932 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1933 'RawArray', 'current_process', 'active_children', 'Pipe',
1934 'connection', 'JoinableQueue'
1935 )))
1936
1937testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1938globals().update(testcases_processes)
1939
1940
1941class ManagerMixin(object):
1942 TYPE = 'manager'
1943 Process = multiprocessing.Process
1944 manager = object.__new__(multiprocessing.managers.SyncManager)
1945 locals().update(get_attributes(manager, (
1946 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1947 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1948 'Namespace', 'JoinableQueue'
1949 )))
1950
1951testcases_manager = create_test_cases(ManagerMixin, type='manager')
1952globals().update(testcases_manager)
1953
1954
1955class ThreadsMixin(object):
1956 TYPE = 'threads'
1957 Process = multiprocessing.dummy.Process
1958 locals().update(get_attributes(multiprocessing.dummy, (
1959 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1960 'Condition', 'Event', 'Value', 'Array', 'current_process',
1961 'active_children', 'Pipe', 'connection', 'dict', 'list',
1962 'Namespace', 'JoinableQueue'
1963 )))
1964
1965testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1966globals().update(testcases_threads)
1967
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001968class OtherTest(unittest.TestCase):
1969 # TODO: add more tests for deliver/answer challenge.
1970 def test_deliver_challenge_auth_failure(self):
1971 class _FakeConnection(object):
1972 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001973 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001974 def send_bytes(self, data):
1975 pass
1976 self.assertRaises(multiprocessing.AuthenticationError,
1977 multiprocessing.connection.deliver_challenge,
1978 _FakeConnection(), b'abc')
1979
1980 def test_answer_challenge_auth_failure(self):
1981 class _FakeConnection(object):
1982 def __init__(self):
1983 self.count = 0
1984 def recv_bytes(self, size):
1985 self.count += 1
1986 if self.count == 1:
1987 return multiprocessing.connection.CHALLENGE
1988 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001989 return b'something bogus'
1990 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001991 def send_bytes(self, data):
1992 pass
1993 self.assertRaises(multiprocessing.AuthenticationError,
1994 multiprocessing.connection.answer_challenge,
1995 _FakeConnection(), b'abc')
1996
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001997#
1998# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1999#
2000
2001def initializer(ns):
2002 ns.test += 1
2003
2004class TestInitializers(unittest.TestCase):
2005 def setUp(self):
2006 self.mgr = multiprocessing.Manager()
2007 self.ns = self.mgr.Namespace()
2008 self.ns.test = 0
2009
2010 def tearDown(self):
2011 self.mgr.shutdown()
2012
2013 def test_manager_initializer(self):
2014 m = multiprocessing.managers.SyncManager()
2015 self.assertRaises(TypeError, m.start, 1)
2016 m.start(initializer, (self.ns,))
2017 self.assertEqual(self.ns.test, 1)
2018 m.shutdown()
2019
2020 def test_pool_initializer(self):
2021 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2022 p = multiprocessing.Pool(1, initializer, (self.ns,))
2023 p.close()
2024 p.join()
2025 self.assertEqual(self.ns.test, 1)
2026
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002027#
2028# Issue 5155, 5313, 5331: Test process in processes
2029# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2030#
2031
2032def _ThisSubProcess(q):
2033 try:
2034 item = q.get(block=False)
2035 except pyqueue.Empty:
2036 pass
2037
2038def _TestProcess(q):
2039 queue = multiprocessing.Queue()
2040 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2041 subProc.start()
2042 subProc.join()
2043
2044def _afunc(x):
2045 return x*x
2046
2047def pool_in_process():
2048 pool = multiprocessing.Pool(processes=4)
2049 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2050
2051class _file_like(object):
2052 def __init__(self, delegate):
2053 self._delegate = delegate
2054 self._pid = None
2055
2056 @property
2057 def cache(self):
2058 pid = os.getpid()
2059 # There are no race conditions since fork keeps only the running thread
2060 if pid != self._pid:
2061 self._pid = pid
2062 self._cache = []
2063 return self._cache
2064
2065 def write(self, data):
2066 self.cache.append(data)
2067
2068 def flush(self):
2069 self._delegate.write(''.join(self.cache))
2070 self._cache = []
2071
2072class TestStdinBadfiledescriptor(unittest.TestCase):
2073
2074 def test_queue_in_process(self):
2075 queue = multiprocessing.Queue()
2076 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2077 proc.start()
2078 proc.join()
2079
2080 def test_pool_in_process(self):
2081 p = multiprocessing.Process(target=pool_in_process)
2082 p.start()
2083 p.join()
2084
2085 def test_flushing(self):
2086 sio = io.StringIO()
2087 flike = _file_like(sio)
2088 flike.write('foo')
2089 proc = multiprocessing.Process(target=lambda: flike.flush())
2090 flike.flush()
2091 assert sio.getvalue() == 'foo'
2092
2093testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2094 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002095
Benjamin Petersone711caf2008-06-11 16:44:04 +00002096#
2097#
2098#
2099
2100def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002101 if sys.platform.startswith("linux"):
2102 try:
2103 lock = multiprocessing.RLock()
2104 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002105 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002106
Benjamin Petersone711caf2008-06-11 16:44:04 +00002107 if run is None:
2108 from test.support import run_unittest as run
2109
2110 util.get_temp_dir() # creates temp directory for use by all processes
2111
2112 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2113
Benjamin Peterson41181742008-07-02 20:22:54 +00002114 ProcessesMixin.pool = multiprocessing.Pool(4)
2115 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2116 ManagerMixin.manager.__init__()
2117 ManagerMixin.manager.start()
2118 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002119
2120 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002121 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2122 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002123 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2124 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002125 )
2126
2127 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2128 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2129 run(suite)
2130
Benjamin Peterson41181742008-07-02 20:22:54 +00002131 ThreadsMixin.pool.terminate()
2132 ProcessesMixin.pool.terminate()
2133 ManagerMixin.pool.terminate()
2134 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002135
Benjamin Peterson41181742008-07-02 20:22:54 +00002136 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002137
2138def main():
2139 test_main(unittest.TextTestRunner(verbosity=2).run)
2140
2141if __name__ == '__main__':
2142 main()