blob: 0b3f937ace9c2884152ec6bc566c32db332dfad8 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000019import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
R. David Murraya21e4ca2009-03-31 23:16:50 +000022# Skip tests if _multiprocessing wasn't built.
23_multiprocessing = test.support.import_module('_multiprocessing')
24# Skip tests if sem_open implementation is broken.
25test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000026# import threading after _multiprocessing to raise a more revelant error
27# message: "No module named _multiprocessing". _multiprocessing is not compiled
28# without thread support.
29import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
Benjamin Petersone711caf2008-06-11 16:44:04 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37from multiprocessing import util
38
Brian Curtinafa88b52010-10-07 01:12:19 +000039try:
40 from multiprocessing.sharedctypes import Value, copy
41 HAS_SHAREDCTYPES = True
42except ImportError:
43 HAS_SHAREDCTYPES = False
44
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46#
47#
48
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000049def latin(s):
50 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Benjamin Petersone711caf2008-06-11 16:44:04 +000052#
53# Constants
54#
55
56LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000057#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59DELTA = 0.1
60CHECK_TIMINGS = False # making true makes tests take a lot longer
61 # and can sometimes cause some non-serious
62 # failures because some calls block a bit
63 # longer than expected
64if CHECK_TIMINGS:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
66else:
67 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
68
69HAVE_GETVALUE = not getattr(_multiprocessing,
70 'HAVE_BROKEN_SEM_GETVALUE', False)
71
Jesse Noller6214edd2009-01-19 16:23:53 +000072WIN32 = (sys.platform == "win32")
73
Benjamin Petersone711caf2008-06-11 16:44:04 +000074#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000075# Some tests require ctypes
76#
77
78try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000079 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000080except ImportError:
81 Structure = object
82 c_int = c_double = None
83
84#
Benjamin Petersone711caf2008-06-11 16:44:04 +000085# Creates a wrapper for a function which records the time it takes to finish
86#
87
88class TimingWrapper(object):
89
90 def __init__(self, func):
91 self.func = func
92 self.elapsed = None
93
94 def __call__(self, *args, **kwds):
95 t = time.time()
96 try:
97 return self.func(*args, **kwds)
98 finally:
99 self.elapsed = time.time() - t
100
101#
102# Base class for test cases
103#
104
105class BaseTestCase(object):
106
107 ALLOWED_TYPES = ('processes', 'manager', 'threads')
108
109 def assertTimingAlmostEqual(self, a, b):
110 if CHECK_TIMINGS:
111 self.assertAlmostEqual(a, b, 1)
112
113 def assertReturnsIfImplemented(self, value, func, *args):
114 try:
115 res = func(*args)
116 except NotImplementedError:
117 pass
118 else:
119 return self.assertEqual(value, res)
120
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000121 # For the sanity of Windows users, rather than crashing or freezing in
122 # multiple ways.
123 def __reduce__(self, *args):
124 raise NotImplementedError("shouldn't try to pickle a test case")
125
126 __reduce_ex__ = __reduce__
127
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128#
129# Return the value of a semaphore
130#
131
132def get_value(self):
133 try:
134 return self.get_value()
135 except AttributeError:
136 try:
137 return self._Semaphore__value
138 except AttributeError:
139 try:
140 return self._value
141 except AttributeError:
142 raise NotImplementedError
143
144#
145# Testcases
146#
147
148class _TestProcess(BaseTestCase):
149
150 ALLOWED_TYPES = ('processes', 'threads')
151
152 def test_current(self):
153 if self.TYPE == 'threads':
154 return
155
156 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158
159 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000161 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 self.assertEqual(current.ident, os.getpid())
164 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000166 @classmethod
167 def _test(cls, q, *args, **kwds):
168 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 q.put(args)
170 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000173 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 q.put(current.pid)
175
176 def test_process(self):
177 q = self.Queue(1)
178 e = self.Event()
179 args = (q, 1, 2)
180 kwargs = {'hello':23, 'bye':2.54}
181 name = 'SomeProcess'
182 p = self.Process(
183 target=self._test, args=args, kwargs=kwargs, name=name
184 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 current = self.current_process()
187
188 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000189 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEquals(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195
196 p.start()
197
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 self.assertEquals(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000200 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201
202 self.assertEquals(q.get(), args[1:])
203 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 self.assertEquals(q.get(), p.pid)
208
209 p.join()
210
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212 self.assertEquals(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000213 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000215 @classmethod
216 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217 time.sleep(1000)
218
219 def test_terminate(self):
220 if self.TYPE == 'threads':
221 return
222
223 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 p.start()
226
227 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000228 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230
231 p.terminate()
232
233 join = TimingWrapper(p.join)
234 self.assertEqual(join(), None)
235 self.assertTimingAlmostEqual(join.elapsed, 0.0)
236
237 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000238 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 p.join()
241
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 # XXX sometimes get p.exitcode == 0 on Windows ...
243 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 def test_cpu_count(self):
246 try:
247 cpus = multiprocessing.cpu_count()
248 except NotImplementedError:
249 cpus = 1
250 self.assertTrue(type(cpus) is int)
251 self.assertTrue(cpus >= 1)
252
253 def test_active_children(self):
254 self.assertEqual(type(self.active_children()), list)
255
256 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000257 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000260 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000265 @classmethod
266 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267 from multiprocessing import forking
268 wconn.send(id)
269 if len(id) < 2:
270 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 p = cls.Process(
272 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 )
274 p.start()
275 p.join()
276
277 def test_recursion(self):
278 rconn, wconn = self.Pipe(duplex=False)
279 self._test_recursion(wconn, [])
280
281 time.sleep(DELTA)
282 result = []
283 while rconn.poll():
284 result.append(rconn.recv())
285
286 expected = [
287 [],
288 [0],
289 [0, 0],
290 [0, 1],
291 [1],
292 [1, 0],
293 [1, 1]
294 ]
295 self.assertEqual(result, expected)
296
297#
298#
299#
300
301class _UpperCaser(multiprocessing.Process):
302
303 def __init__(self):
304 multiprocessing.Process.__init__(self)
305 self.child_conn, self.parent_conn = multiprocessing.Pipe()
306
307 def run(self):
308 self.parent_conn.close()
309 for s in iter(self.child_conn.recv, None):
310 self.child_conn.send(s.upper())
311 self.child_conn.close()
312
313 def submit(self, s):
314 assert type(s) is str
315 self.parent_conn.send(s)
316 return self.parent_conn.recv()
317
318 def stop(self):
319 self.parent_conn.send(None)
320 self.parent_conn.close()
321 self.child_conn.close()
322
323class _TestSubclassingProcess(BaseTestCase):
324
325 ALLOWED_TYPES = ('processes',)
326
327 def test_subclassing(self):
328 uppercaser = _UpperCaser()
329 uppercaser.start()
330 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
331 self.assertEqual(uppercaser.submit('world'), 'WORLD')
332 uppercaser.stop()
333 uppercaser.join()
334
335#
336#
337#
338
339def queue_empty(q):
340 if hasattr(q, 'empty'):
341 return q.empty()
342 else:
343 return q.qsize() == 0
344
345def queue_full(q, maxsize):
346 if hasattr(q, 'full'):
347 return q.full()
348 else:
349 return q.qsize() == maxsize
350
351
352class _TestQueue(BaseTestCase):
353
354
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000355 @classmethod
356 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000357 child_can_start.wait()
358 for i in range(6):
359 queue.get()
360 parent_can_continue.set()
361
362 def test_put(self):
363 MAXSIZE = 6
364 queue = self.Queue(maxsize=MAXSIZE)
365 child_can_start = self.Event()
366 parent_can_continue = self.Event()
367
368 proc = self.Process(
369 target=self._test_put,
370 args=(queue, child_can_start, parent_can_continue)
371 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000372 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373 proc.start()
374
375 self.assertEqual(queue_empty(queue), True)
376 self.assertEqual(queue_full(queue, MAXSIZE), False)
377
378 queue.put(1)
379 queue.put(2, True)
380 queue.put(3, True, None)
381 queue.put(4, False)
382 queue.put(5, False, None)
383 queue.put_nowait(6)
384
385 # the values may be in buffer but not yet in pipe so sleep a bit
386 time.sleep(DELTA)
387
388 self.assertEqual(queue_empty(queue), False)
389 self.assertEqual(queue_full(queue, MAXSIZE), True)
390
391 put = TimingWrapper(queue.put)
392 put_nowait = TimingWrapper(queue.put_nowait)
393
394 self.assertRaises(pyqueue.Full, put, 7, False)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(pyqueue.Full, put, 7, False, None)
398 self.assertTimingAlmostEqual(put.elapsed, 0)
399
400 self.assertRaises(pyqueue.Full, put_nowait, 7)
401 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
402
403 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
404 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
405
406 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
407 self.assertTimingAlmostEqual(put.elapsed, 0)
408
409 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
410 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
411
412 child_can_start.set()
413 parent_can_continue.wait()
414
415 self.assertEqual(queue_empty(queue), True)
416 self.assertEqual(queue_full(queue, MAXSIZE), False)
417
418 proc.join()
419
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000420 @classmethod
421 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000423 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000424 queue.put(2)
425 queue.put(3)
426 queue.put(4)
427 queue.put(5)
428 parent_can_continue.set()
429
430 def test_get(self):
431 queue = self.Queue()
432 child_can_start = self.Event()
433 parent_can_continue = self.Event()
434
435 proc = self.Process(
436 target=self._test_get,
437 args=(queue, child_can_start, parent_can_continue)
438 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000439 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440 proc.start()
441
442 self.assertEqual(queue_empty(queue), True)
443
444 child_can_start.set()
445 parent_can_continue.wait()
446
447 time.sleep(DELTA)
448 self.assertEqual(queue_empty(queue), False)
449
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000450 # Hangs unexpectedly, remove for now
451 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000452 self.assertEqual(queue.get(True, None), 2)
453 self.assertEqual(queue.get(True), 3)
454 self.assertEqual(queue.get(timeout=1), 4)
455 self.assertEqual(queue.get_nowait(), 5)
456
457 self.assertEqual(queue_empty(queue), True)
458
459 get = TimingWrapper(queue.get)
460 get_nowait = TimingWrapper(queue.get_nowait)
461
462 self.assertRaises(pyqueue.Empty, get, False)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get, False, None)
466 self.assertTimingAlmostEqual(get.elapsed, 0)
467
468 self.assertRaises(pyqueue.Empty, get_nowait)
469 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
470
471 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
472 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
473
474 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
475 self.assertTimingAlmostEqual(get.elapsed, 0)
476
477 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
478 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
479
480 proc.join()
481
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000482 @classmethod
483 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000484 for i in range(10, 20):
485 queue.put(i)
486 # note that at this point the items may only be buffered, so the
487 # process cannot shutdown until the feeder thread has finished
488 # pushing items onto the pipe.
489
490 def test_fork(self):
491 # Old versions of Queue would fail to create a new feeder
492 # thread for a forked process if the original process had its
493 # own feeder thread. This test checks that this no longer
494 # happens.
495
496 queue = self.Queue()
497
498 # put items on queue so that main process starts a feeder thread
499 for i in range(10):
500 queue.put(i)
501
502 # wait to make sure thread starts before we fork a new process
503 time.sleep(DELTA)
504
505 # fork process
506 p = self.Process(target=self._test_fork, args=(queue,))
507 p.start()
508
509 # check that all expected items are in the queue
510 for i in range(20):
511 self.assertEqual(queue.get(), i)
512 self.assertRaises(pyqueue.Empty, queue.get, False)
513
514 p.join()
515
516 def test_qsize(self):
517 q = self.Queue()
518 try:
519 self.assertEqual(q.qsize(), 0)
520 except NotImplementedError:
521 return
522 q.put(1)
523 self.assertEqual(q.qsize(), 1)
524 q.put(5)
525 self.assertEqual(q.qsize(), 2)
526 q.get()
527 self.assertEqual(q.qsize(), 1)
528 q.get()
529 self.assertEqual(q.qsize(), 0)
530
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000531 @classmethod
532 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000533 for obj in iter(q.get, None):
534 time.sleep(DELTA)
535 q.task_done()
536
537 def test_task_done(self):
538 queue = self.JoinableQueue()
539
540 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000541 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542
543 workers = [self.Process(target=self._test_task_done, args=(queue,))
544 for i in range(4)]
545
546 for p in workers:
547 p.start()
548
549 for i in range(10):
550 queue.put(i)
551
552 queue.join()
553
554 for p in workers:
555 queue.put(None)
556
557 for p in workers:
558 p.join()
559
560#
561#
562#
563
564class _TestLock(BaseTestCase):
565
566 def test_lock(self):
567 lock = self.Lock()
568 self.assertEqual(lock.acquire(), True)
569 self.assertEqual(lock.acquire(False), False)
570 self.assertEqual(lock.release(), None)
571 self.assertRaises((ValueError, threading.ThreadError), lock.release)
572
573 def test_rlock(self):
574 lock = self.RLock()
575 self.assertEqual(lock.acquire(), True)
576 self.assertEqual(lock.acquire(), True)
577 self.assertEqual(lock.acquire(), True)
578 self.assertEqual(lock.release(), None)
579 self.assertEqual(lock.release(), None)
580 self.assertEqual(lock.release(), None)
581 self.assertRaises((AssertionError, RuntimeError), lock.release)
582
Jesse Nollerf8d00852009-03-31 03:25:07 +0000583 def test_lock_context(self):
584 with self.Lock():
585 pass
586
Benjamin Petersone711caf2008-06-11 16:44:04 +0000587
588class _TestSemaphore(BaseTestCase):
589
590 def _test_semaphore(self, sem):
591 self.assertReturnsIfImplemented(2, get_value, sem)
592 self.assertEqual(sem.acquire(), True)
593 self.assertReturnsIfImplemented(1, get_value, sem)
594 self.assertEqual(sem.acquire(), True)
595 self.assertReturnsIfImplemented(0, get_value, sem)
596 self.assertEqual(sem.acquire(False), False)
597 self.assertReturnsIfImplemented(0, get_value, sem)
598 self.assertEqual(sem.release(), None)
599 self.assertReturnsIfImplemented(1, get_value, sem)
600 self.assertEqual(sem.release(), None)
601 self.assertReturnsIfImplemented(2, get_value, sem)
602
603 def test_semaphore(self):
604 sem = self.Semaphore(2)
605 self._test_semaphore(sem)
606 self.assertEqual(sem.release(), None)
607 self.assertReturnsIfImplemented(3, get_value, sem)
608 self.assertEqual(sem.release(), None)
609 self.assertReturnsIfImplemented(4, get_value, sem)
610
611 def test_bounded_semaphore(self):
612 sem = self.BoundedSemaphore(2)
613 self._test_semaphore(sem)
614 # Currently fails on OS/X
615 #if HAVE_GETVALUE:
616 # self.assertRaises(ValueError, sem.release)
617 # self.assertReturnsIfImplemented(2, get_value, sem)
618
619 def test_timeout(self):
620 if self.TYPE != 'processes':
621 return
622
623 sem = self.Semaphore(0)
624 acquire = TimingWrapper(sem.acquire)
625
626 self.assertEqual(acquire(False), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, None), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
631
632 self.assertEqual(acquire(False, TIMEOUT1), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, 0)
634
635 self.assertEqual(acquire(True, TIMEOUT2), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
637
638 self.assertEqual(acquire(timeout=TIMEOUT3), False)
639 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
640
641
642class _TestCondition(BaseTestCase):
643
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000644 @classmethod
645 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000646 cond.acquire()
647 sleeping.release()
648 cond.wait(timeout)
649 woken.release()
650 cond.release()
651
652 def check_invariant(self, cond):
653 # this is only supposed to succeed when there are no sleepers
654 if self.TYPE == 'processes':
655 try:
656 sleepers = (cond._sleeping_count.get_value() -
657 cond._woken_count.get_value())
658 self.assertEqual(sleepers, 0)
659 self.assertEqual(cond._wait_semaphore.get_value(), 0)
660 except NotImplementedError:
661 pass
662
663 def test_notify(self):
664 cond = self.Condition()
665 sleeping = self.Semaphore(0)
666 woken = self.Semaphore(0)
667
668 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000669 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000670 p.start()
671
672 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000673 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 p.start()
675
676 # wait for both children to start sleeping
677 sleeping.acquire()
678 sleeping.acquire()
679
680 # check no process/thread has woken up
681 time.sleep(DELTA)
682 self.assertReturnsIfImplemented(0, get_value, woken)
683
684 # wake up one process/thread
685 cond.acquire()
686 cond.notify()
687 cond.release()
688
689 # check one process/thread has woken up
690 time.sleep(DELTA)
691 self.assertReturnsIfImplemented(1, get_value, woken)
692
693 # wake up another
694 cond.acquire()
695 cond.notify()
696 cond.release()
697
698 # check other has woken up
699 time.sleep(DELTA)
700 self.assertReturnsIfImplemented(2, get_value, woken)
701
702 # check state is not mucked up
703 self.check_invariant(cond)
704 p.join()
705
706 def test_notify_all(self):
707 cond = self.Condition()
708 sleeping = self.Semaphore(0)
709 woken = self.Semaphore(0)
710
711 # start some threads/processes which will timeout
712 for i in range(3):
713 p = self.Process(target=self.f,
714 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000715 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000716 p.start()
717
718 t = threading.Thread(target=self.f,
719 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000720 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721 t.start()
722
723 # wait for them all to sleep
724 for i in range(6):
725 sleeping.acquire()
726
727 # check they have all timed out
728 for i in range(6):
729 woken.acquire()
730 self.assertReturnsIfImplemented(0, get_value, woken)
731
732 # check state is not mucked up
733 self.check_invariant(cond)
734
735 # start some more threads/processes
736 for i in range(3):
737 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000738 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000739 p.start()
740
741 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000742 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000743 t.start()
744
745 # wait for them to all sleep
746 for i in range(6):
747 sleeping.acquire()
748
749 # check no process/thread has woken up
750 time.sleep(DELTA)
751 self.assertReturnsIfImplemented(0, get_value, woken)
752
753 # wake them all up
754 cond.acquire()
755 cond.notify_all()
756 cond.release()
757
758 # check they have all woken
759 time.sleep(DELTA)
760 self.assertReturnsIfImplemented(6, get_value, woken)
761
762 # check state is not mucked up
763 self.check_invariant(cond)
764
765 def test_timeout(self):
766 cond = self.Condition()
767 wait = TimingWrapper(cond.wait)
768 cond.acquire()
769 res = wait(TIMEOUT1)
770 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000771 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000772 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
773
774
775class _TestEvent(BaseTestCase):
776
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000777 @classmethod
778 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 time.sleep(TIMEOUT2)
780 event.set()
781
782 def test_event(self):
783 event = self.Event()
784 wait = TimingWrapper(event.wait)
785
786 # Removed temporaily, due to API shear, this does not
787 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000789
Benjamin Peterson965ce872009-04-05 21:24:58 +0000790 # Removed, threading.Event.wait() will return the value of the __flag
791 # instead of None. API Shear with the semaphore backed mp.Event
792 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000794 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000795 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
796
797 event.set()
798
799 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(event.is_set(), True)
801 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000803 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
805 # self.assertEqual(event.is_set(), True)
806
807 event.clear()
808
809 #self.assertEqual(event.is_set(), False)
810
811 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000812 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000813
814#
815#
816#
817
Brian Curtinafa88b52010-10-07 01:12:19 +0000818@unittest.skipUnless(HAS_SHAREDCTYPES,
819 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820class _TestValue(BaseTestCase):
821
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000822 ALLOWED_TYPES = ('processes',)
823
Benjamin Petersone711caf2008-06-11 16:44:04 +0000824 codes_values = [
825 ('i', 4343, 24234),
826 ('d', 3.625, -4.25),
827 ('h', -232, 234),
828 ('c', latin('x'), latin('y'))
829 ]
830
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000831 @classmethod
832 def _test(cls, values):
833 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 sv.value = cv[2]
835
836
837 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 if raw:
839 values = [self.RawValue(code, value)
840 for code, value, _ in self.codes_values]
841 else:
842 values = [self.Value(code, value)
843 for code, value, _ in self.codes_values]
844
845 for sv, cv in zip(values, self.codes_values):
846 self.assertEqual(sv.value, cv[1])
847
848 proc = self.Process(target=self._test, args=(values,))
849 proc.start()
850 proc.join()
851
852 for sv, cv in zip(values, self.codes_values):
853 self.assertEqual(sv.value, cv[2])
854
855 def test_rawvalue(self):
856 self.test_value(raw=True)
857
858 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 val1 = self.Value('i', 5)
860 lock1 = val1.get_lock()
861 obj1 = val1.get_obj()
862
863 val2 = self.Value('i', 5, lock=None)
864 lock2 = val2.get_lock()
865 obj2 = val2.get_obj()
866
867 lock = self.Lock()
868 val3 = self.Value('i', 5, lock=lock)
869 lock3 = val3.get_lock()
870 obj3 = val3.get_obj()
871 self.assertEqual(lock, lock3)
872
Jesse Nollerb0516a62009-01-18 03:11:38 +0000873 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 self.assertFalse(hasattr(arr4, 'get_lock'))
875 self.assertFalse(hasattr(arr4, 'get_obj'))
876
Jesse Nollerb0516a62009-01-18 03:11:38 +0000877 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
878
879 arr5 = self.RawValue('i', 5)
880 self.assertFalse(hasattr(arr5, 'get_lock'))
881 self.assertFalse(hasattr(arr5, 'get_obj'))
882
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883
884class _TestArray(BaseTestCase):
885
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000886 ALLOWED_TYPES = ('processes',)
887
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000888 @classmethod
889 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 for i in range(1, len(seq)):
891 seq[i] += seq[i-1]
892
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000893 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
896 if raw:
897 arr = self.RawArray('i', seq)
898 else:
899 arr = self.Array('i', seq)
900
901 self.assertEqual(len(arr), len(seq))
902 self.assertEqual(arr[3], seq[3])
903 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
904
905 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
906
907 self.assertEqual(list(arr[:]), seq)
908
909 self.f(seq)
910
911 p = self.Process(target=self.f, args=(arr,))
912 p.start()
913 p.join()
914
915 self.assertEqual(list(arr[:]), seq)
916
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000917 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 def test_rawarray(self):
919 self.test_array(raw=True)
920
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000921 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000922 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000923 arr1 = self.Array('i', list(range(10)))
924 lock1 = arr1.get_lock()
925 obj1 = arr1.get_obj()
926
927 arr2 = self.Array('i', list(range(10)), lock=None)
928 lock2 = arr2.get_lock()
929 obj2 = arr2.get_obj()
930
931 lock = self.Lock()
932 arr3 = self.Array('i', list(range(10)), lock=lock)
933 lock3 = arr3.get_lock()
934 obj3 = arr3.get_obj()
935 self.assertEqual(lock, lock3)
936
Jesse Nollerb0516a62009-01-18 03:11:38 +0000937 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 self.assertFalse(hasattr(arr4, 'get_lock'))
939 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000940 self.assertRaises(AttributeError,
941 self.Array, 'i', range(10), lock='notalock')
942
943 arr5 = self.RawArray('i', range(10))
944 self.assertFalse(hasattr(arr5, 'get_lock'))
945 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946
947#
948#
949#
950
951class _TestContainers(BaseTestCase):
952
953 ALLOWED_TYPES = ('manager',)
954
955 def test_list(self):
956 a = self.list(list(range(10)))
957 self.assertEqual(a[:], list(range(10)))
958
959 b = self.list()
960 self.assertEqual(b[:], [])
961
962 b.extend(list(range(5)))
963 self.assertEqual(b[:], list(range(5)))
964
965 self.assertEqual(b[2], 2)
966 self.assertEqual(b[2:10], [2,3,4])
967
968 b *= 2
969 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
970
971 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
972
973 self.assertEqual(a[:], list(range(10)))
974
975 d = [a, b]
976 e = self.list(d)
977 self.assertEqual(
978 e[:],
979 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
980 )
981
982 f = self.list([a])
983 a.append('hello')
984 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
985
986 def test_dict(self):
987 d = self.dict()
988 indices = list(range(65, 70))
989 for i in indices:
990 d[i] = chr(i)
991 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
992 self.assertEqual(sorted(d.keys()), indices)
993 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
994 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
995
996 def test_namespace(self):
997 n = self.Namespace()
998 n.name = 'Bob'
999 n.job = 'Builder'
1000 n._hidden = 'hidden'
1001 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1002 del n.job
1003 self.assertEqual(str(n), "Namespace(name='Bob')")
1004 self.assertTrue(hasattr(n, 'name'))
1005 self.assertTrue(not hasattr(n, 'job'))
1006
1007#
1008#
1009#
1010
1011def sqr(x, wait=0.0):
1012 time.sleep(wait)
1013 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +00001014class _TestPool(BaseTestCase):
1015
1016 def test_apply(self):
1017 papply = self.pool.apply
1018 self.assertEqual(papply(sqr, (5,)), sqr(5))
1019 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1020
1021 def test_map(self):
1022 pmap = self.pool.map
1023 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1024 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1025 list(map(sqr, list(range(100)))))
1026
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001027 def test_map_chunksize(self):
1028 try:
1029 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1030 except multiprocessing.TimeoutError:
1031 self.fail("pool.map_async with chunksize stalled on null list")
1032
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033 def test_async(self):
1034 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1035 get = TimingWrapper(res.get)
1036 self.assertEqual(get(), 49)
1037 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1038
1039 def test_async_timeout(self):
1040 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1041 get = TimingWrapper(res.get)
1042 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1043 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1044
1045 def test_imap(self):
1046 it = self.pool.imap(sqr, list(range(10)))
1047 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1048
1049 it = self.pool.imap(sqr, list(range(10)))
1050 for i in range(10):
1051 self.assertEqual(next(it), i*i)
1052 self.assertRaises(StopIteration, it.__next__)
1053
1054 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1055 for i in range(1000):
1056 self.assertEqual(next(it), i*i)
1057 self.assertRaises(StopIteration, it.__next__)
1058
1059 def test_imap_unordered(self):
1060 it = self.pool.imap_unordered(sqr, list(range(1000)))
1061 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1062
1063 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1064 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1065
1066 def test_make_pool(self):
1067 p = multiprocessing.Pool(3)
1068 self.assertEqual(3, len(p._pool))
1069 p.close()
1070 p.join()
1071
1072 def test_terminate(self):
1073 if self.TYPE == 'manager':
1074 # On Unix a forked process increfs each shared object to
1075 # which its parent process held a reference. If the
1076 # forked process gets terminated then there is likely to
1077 # be a reference leak. So to prevent
1078 # _TestZZZNumberOfObjects from failing we skip this test
1079 # when using a manager.
1080 return
1081
1082 result = self.pool.map_async(
1083 time.sleep, [0.1 for i in range(10000)], chunksize=1
1084 )
1085 self.pool.terminate()
1086 join = TimingWrapper(self.pool.join)
1087 join()
1088 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001089
1090class _TestPoolWorkerLifetime(BaseTestCase):
1091
1092 ALLOWED_TYPES = ('processes', )
1093 def test_pool_worker_lifetime(self):
1094 p = multiprocessing.Pool(3, maxtasksperchild=10)
1095 self.assertEqual(3, len(p._pool))
1096 origworkerpids = [w.pid for w in p._pool]
1097 # Run many tasks so each worker gets replaced (hopefully)
1098 results = []
1099 for i in range(100):
1100 results.append(p.apply_async(sqr, (i, )))
1101 # Fetch the results and verify we got the right answers,
1102 # also ensuring all the tasks have completed.
1103 for (j, res) in enumerate(results):
1104 self.assertEqual(res.get(), sqr(j))
1105 # Refill the pool
1106 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001107 # Wait until all workers are alive
1108 countdown = 5
1109 while countdown and not all(w.is_alive() for w in p._pool):
1110 countdown -= 1
1111 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001112 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001113 # All pids should be assigned. See issue #7805.
1114 self.assertNotIn(None, origworkerpids)
1115 self.assertNotIn(None, finalworkerpids)
1116 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001117 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1118 p.close()
1119 p.join()
1120
Benjamin Petersone711caf2008-06-11 16:44:04 +00001121#
1122# Test that manager has expected number of shared objects left
1123#
1124
1125class _TestZZZNumberOfObjects(BaseTestCase):
1126 # Because test cases are sorted alphabetically, this one will get
1127 # run after all the other tests for the manager. It tests that
1128 # there have been no "reference leaks" for the manager's shared
1129 # objects. Note the comment in _TestPool.test_terminate().
1130 ALLOWED_TYPES = ('manager',)
1131
1132 def test_number_of_objects(self):
1133 EXPECTED_NUMBER = 1 # the pool object is still alive
1134 multiprocessing.active_children() # discard dead process objs
1135 gc.collect() # do garbage collection
1136 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001137 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001138 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001139 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001140 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001141
1142 self.assertEqual(refs, EXPECTED_NUMBER)
1143
1144#
1145# Test of creating a customized manager class
1146#
1147
1148from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1149
1150class FooBar(object):
1151 def f(self):
1152 return 'f()'
1153 def g(self):
1154 raise ValueError
1155 def _h(self):
1156 return '_h()'
1157
1158def baz():
1159 for i in range(10):
1160 yield i*i
1161
1162class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001163 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001164 def __iter__(self):
1165 return self
1166 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001167 return self._callmethod('__next__')
1168
1169class MyManager(BaseManager):
1170 pass
1171
1172MyManager.register('Foo', callable=FooBar)
1173MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1174MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1175
1176
1177class _TestMyManager(BaseTestCase):
1178
1179 ALLOWED_TYPES = ('manager',)
1180
1181 def test_mymanager(self):
1182 manager = MyManager()
1183 manager.start()
1184
1185 foo = manager.Foo()
1186 bar = manager.Bar()
1187 baz = manager.baz()
1188
1189 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1190 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1191
1192 self.assertEqual(foo_methods, ['f', 'g'])
1193 self.assertEqual(bar_methods, ['f', '_h'])
1194
1195 self.assertEqual(foo.f(), 'f()')
1196 self.assertRaises(ValueError, foo.g)
1197 self.assertEqual(foo._callmethod('f'), 'f()')
1198 self.assertRaises(RemoteError, foo._callmethod, '_h')
1199
1200 self.assertEqual(bar.f(), 'f()')
1201 self.assertEqual(bar._h(), '_h()')
1202 self.assertEqual(bar._callmethod('f'), 'f()')
1203 self.assertEqual(bar._callmethod('_h'), '_h()')
1204
1205 self.assertEqual(list(baz), [i*i for i in range(10)])
1206
1207 manager.shutdown()
1208
1209#
1210# Test of connecting to a remote server and using xmlrpclib for serialization
1211#
1212
1213_queue = pyqueue.Queue()
1214def get_queue():
1215 return _queue
1216
1217class QueueManager(BaseManager):
1218 '''manager class used by server process'''
1219QueueManager.register('get_queue', callable=get_queue)
1220
1221class QueueManager2(BaseManager):
1222 '''manager class which specifies the same interface as QueueManager'''
1223QueueManager2.register('get_queue')
1224
1225
1226SERIALIZER = 'xmlrpclib'
1227
1228class _TestRemoteManager(BaseTestCase):
1229
1230 ALLOWED_TYPES = ('manager',)
1231
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001232 @classmethod
1233 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001234 manager = QueueManager2(
1235 address=address, authkey=authkey, serializer=SERIALIZER
1236 )
1237 manager.connect()
1238 queue = manager.get_queue()
1239 queue.put(('hello world', None, True, 2.25))
1240
1241 def test_remote(self):
1242 authkey = os.urandom(32)
1243
1244 manager = QueueManager(
1245 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1246 )
1247 manager.start()
1248
1249 p = self.Process(target=self._putter, args=(manager.address, authkey))
1250 p.start()
1251
1252 manager2 = QueueManager2(
1253 address=manager.address, authkey=authkey, serializer=SERIALIZER
1254 )
1255 manager2.connect()
1256 queue = manager2.get_queue()
1257
1258 # Note that xmlrpclib will deserialize object as a list not a tuple
1259 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1260
1261 # Because we are using xmlrpclib for serialization instead of
1262 # pickle this will cause a serialization error.
1263 self.assertRaises(Exception, queue.put, time.sleep)
1264
1265 # Make queue finalizer run before the server is stopped
1266 del queue
1267 manager.shutdown()
1268
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001269class _TestManagerRestart(BaseTestCase):
1270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001271 @classmethod
1272 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001273 manager = QueueManager(
1274 address=address, authkey=authkey, serializer=SERIALIZER)
1275 manager.connect()
1276 queue = manager.get_queue()
1277 queue.put('hello world')
1278
1279 def test_rapid_restart(self):
1280 authkey = os.urandom(32)
1281 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001282 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001283 srvr = manager.get_server()
1284 addr = srvr.address
1285 # Close the connection.Listener socket which gets opened as a part
1286 # of manager.get_server(). It's not needed for the test.
1287 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001288 manager.start()
1289
1290 p = self.Process(target=self._putter, args=(manager.address, authkey))
1291 p.start()
1292 queue = manager.get_queue()
1293 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001294 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001295 manager.shutdown()
1296 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001297 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001298 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001299 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001300
Benjamin Petersone711caf2008-06-11 16:44:04 +00001301#
1302#
1303#
1304
1305SENTINEL = latin('')
1306
1307class _TestConnection(BaseTestCase):
1308
1309 ALLOWED_TYPES = ('processes', 'threads')
1310
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001311 @classmethod
1312 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001313 for msg in iter(conn.recv_bytes, SENTINEL):
1314 conn.send_bytes(msg)
1315 conn.close()
1316
1317 def test_connection(self):
1318 conn, child_conn = self.Pipe()
1319
1320 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001321 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001322 p.start()
1323
1324 seq = [1, 2.25, None]
1325 msg = latin('hello world')
1326 longmsg = msg * 10
1327 arr = array.array('i', list(range(4)))
1328
1329 if self.TYPE == 'processes':
1330 self.assertEqual(type(conn.fileno()), int)
1331
1332 self.assertEqual(conn.send(seq), None)
1333 self.assertEqual(conn.recv(), seq)
1334
1335 self.assertEqual(conn.send_bytes(msg), None)
1336 self.assertEqual(conn.recv_bytes(), msg)
1337
1338 if self.TYPE == 'processes':
1339 buffer = array.array('i', [0]*10)
1340 expected = list(arr) + [0] * (10 - len(arr))
1341 self.assertEqual(conn.send_bytes(arr), None)
1342 self.assertEqual(conn.recv_bytes_into(buffer),
1343 len(arr) * buffer.itemsize)
1344 self.assertEqual(list(buffer), expected)
1345
1346 buffer = array.array('i', [0]*10)
1347 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1348 self.assertEqual(conn.send_bytes(arr), None)
1349 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1350 len(arr) * buffer.itemsize)
1351 self.assertEqual(list(buffer), expected)
1352
1353 buffer = bytearray(latin(' ' * 40))
1354 self.assertEqual(conn.send_bytes(longmsg), None)
1355 try:
1356 res = conn.recv_bytes_into(buffer)
1357 except multiprocessing.BufferTooShort as e:
1358 self.assertEqual(e.args, (longmsg,))
1359 else:
1360 self.fail('expected BufferTooShort, got %s' % res)
1361
1362 poll = TimingWrapper(conn.poll)
1363
1364 self.assertEqual(poll(), False)
1365 self.assertTimingAlmostEqual(poll.elapsed, 0)
1366
1367 self.assertEqual(poll(TIMEOUT1), False)
1368 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1369
1370 conn.send(None)
1371
1372 self.assertEqual(poll(TIMEOUT1), True)
1373 self.assertTimingAlmostEqual(poll.elapsed, 0)
1374
1375 self.assertEqual(conn.recv(), None)
1376
1377 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1378 conn.send_bytes(really_big_msg)
1379 self.assertEqual(conn.recv_bytes(), really_big_msg)
1380
1381 conn.send_bytes(SENTINEL) # tell child to quit
1382 child_conn.close()
1383
1384 if self.TYPE == 'processes':
1385 self.assertEqual(conn.readable, True)
1386 self.assertEqual(conn.writable, True)
1387 self.assertRaises(EOFError, conn.recv)
1388 self.assertRaises(EOFError, conn.recv_bytes)
1389
1390 p.join()
1391
1392 def test_duplex_false(self):
1393 reader, writer = self.Pipe(duplex=False)
1394 self.assertEqual(writer.send(1), None)
1395 self.assertEqual(reader.recv(), 1)
1396 if self.TYPE == 'processes':
1397 self.assertEqual(reader.readable, True)
1398 self.assertEqual(reader.writable, False)
1399 self.assertEqual(writer.readable, False)
1400 self.assertEqual(writer.writable, True)
1401 self.assertRaises(IOError, reader.send, 2)
1402 self.assertRaises(IOError, writer.recv)
1403 self.assertRaises(IOError, writer.poll)
1404
1405 def test_spawn_close(self):
1406 # We test that a pipe connection can be closed by parent
1407 # process immediately after child is spawned. On Windows this
1408 # would have sometimes failed on old versions because
1409 # child_conn would be closed before the child got a chance to
1410 # duplicate it.
1411 conn, child_conn = self.Pipe()
1412
1413 p = self.Process(target=self._echo, args=(child_conn,))
1414 p.start()
1415 child_conn.close() # this might complete before child initializes
1416
1417 msg = latin('hello')
1418 conn.send_bytes(msg)
1419 self.assertEqual(conn.recv_bytes(), msg)
1420
1421 conn.send_bytes(SENTINEL)
1422 conn.close()
1423 p.join()
1424
1425 def test_sendbytes(self):
1426 if self.TYPE != 'processes':
1427 return
1428
1429 msg = latin('abcdefghijklmnopqrstuvwxyz')
1430 a, b = self.Pipe()
1431
1432 a.send_bytes(msg)
1433 self.assertEqual(b.recv_bytes(), msg)
1434
1435 a.send_bytes(msg, 5)
1436 self.assertEqual(b.recv_bytes(), msg[5:])
1437
1438 a.send_bytes(msg, 7, 8)
1439 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1440
1441 a.send_bytes(msg, 26)
1442 self.assertEqual(b.recv_bytes(), latin(''))
1443
1444 a.send_bytes(msg, 26, 0)
1445 self.assertEqual(b.recv_bytes(), latin(''))
1446
1447 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1448
1449 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1450
1451 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1452
1453 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1454
1455 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1456
Benjamin Petersone711caf2008-06-11 16:44:04 +00001457class _TestListenerClient(BaseTestCase):
1458
1459 ALLOWED_TYPES = ('processes', 'threads')
1460
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001461 @classmethod
1462 def _test(cls, address):
1463 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001464 conn.send('hello')
1465 conn.close()
1466
1467 def test_listener_client(self):
1468 for family in self.connection.families:
1469 l = self.connection.Listener(family=family)
1470 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001471 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001472 p.start()
1473 conn = l.accept()
1474 self.assertEqual(conn.recv(), 'hello')
1475 p.join()
1476 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001477#
1478# Test of sending connection and socket objects between processes
1479#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001480"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001481class _TestPicklingConnections(BaseTestCase):
1482
1483 ALLOWED_TYPES = ('processes',)
1484
1485 def _listener(self, conn, families):
1486 for fam in families:
1487 l = self.connection.Listener(family=fam)
1488 conn.send(l.address)
1489 new_conn = l.accept()
1490 conn.send(new_conn)
1491
1492 if self.TYPE == 'processes':
1493 l = socket.socket()
1494 l.bind(('localhost', 0))
1495 conn.send(l.getsockname())
1496 l.listen(1)
1497 new_conn, addr = l.accept()
1498 conn.send(new_conn)
1499
1500 conn.recv()
1501
1502 def _remote(self, conn):
1503 for (address, msg) in iter(conn.recv, None):
1504 client = self.connection.Client(address)
1505 client.send(msg.upper())
1506 client.close()
1507
1508 if self.TYPE == 'processes':
1509 address, msg = conn.recv()
1510 client = socket.socket()
1511 client.connect(address)
1512 client.sendall(msg.upper())
1513 client.close()
1514
1515 conn.close()
1516
1517 def test_pickling(self):
1518 try:
1519 multiprocessing.allow_connection_pickling()
1520 except ImportError:
1521 return
1522
1523 families = self.connection.families
1524
1525 lconn, lconn0 = self.Pipe()
1526 lp = self.Process(target=self._listener, args=(lconn0, families))
1527 lp.start()
1528 lconn0.close()
1529
1530 rconn, rconn0 = self.Pipe()
1531 rp = self.Process(target=self._remote, args=(rconn0,))
1532 rp.start()
1533 rconn0.close()
1534
1535 for fam in families:
1536 msg = ('This connection uses family %s' % fam).encode('ascii')
1537 address = lconn.recv()
1538 rconn.send((address, msg))
1539 new_conn = lconn.recv()
1540 self.assertEqual(new_conn.recv(), msg.upper())
1541
1542 rconn.send(None)
1543
1544 if self.TYPE == 'processes':
1545 msg = latin('This connection uses a normal socket')
1546 address = lconn.recv()
1547 rconn.send((address, msg))
1548 if hasattr(socket, 'fromfd'):
1549 new_conn = lconn.recv()
1550 self.assertEqual(new_conn.recv(100), msg.upper())
1551 else:
1552 # XXX On Windows with Py2.6 need to backport fromfd()
1553 discard = lconn.recv_bytes()
1554
1555 lconn.send(None)
1556
1557 rconn.close()
1558 lconn.close()
1559
1560 lp.join()
1561 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001562"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001563#
1564#
1565#
1566
1567class _TestHeap(BaseTestCase):
1568
1569 ALLOWED_TYPES = ('processes',)
1570
1571 def test_heap(self):
1572 iterations = 5000
1573 maxblocks = 50
1574 blocks = []
1575
1576 # create and destroy lots of blocks of different sizes
1577 for i in range(iterations):
1578 size = int(random.lognormvariate(0, 1) * 1000)
1579 b = multiprocessing.heap.BufferWrapper(size)
1580 blocks.append(b)
1581 if len(blocks) > maxblocks:
1582 i = random.randrange(maxblocks)
1583 del blocks[i]
1584
1585 # get the heap object
1586 heap = multiprocessing.heap.BufferWrapper._heap
1587
1588 # verify the state of the heap
1589 all = []
1590 occupied = 0
1591 for L in list(heap._len_to_seq.values()):
1592 for arena, start, stop in L:
1593 all.append((heap._arenas.index(arena), start, stop,
1594 stop-start, 'free'))
1595 for arena, start, stop in heap._allocated_blocks:
1596 all.append((heap._arenas.index(arena), start, stop,
1597 stop-start, 'occupied'))
1598 occupied += (stop-start)
1599
1600 all.sort()
1601
1602 for i in range(len(all)-1):
1603 (arena, start, stop) = all[i][:3]
1604 (narena, nstart, nstop) = all[i+1][:3]
1605 self.assertTrue((arena != narena and nstart == 0) or
1606 (stop == nstart))
1607
1608#
1609#
1610#
1611
Benjamin Petersone711caf2008-06-11 16:44:04 +00001612class _Foo(Structure):
1613 _fields_ = [
1614 ('x', c_int),
1615 ('y', c_double)
1616 ]
1617
Brian Curtinafa88b52010-10-07 01:12:19 +00001618@unittest.skipUnless(HAS_SHAREDCTYPES,
1619 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001620class _TestSharedCTypes(BaseTestCase):
1621
1622 ALLOWED_TYPES = ('processes',)
1623
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001624 @classmethod
1625 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001626 x.value *= 2
1627 y.value *= 2
1628 foo.x *= 2
1629 foo.y *= 2
1630 string.value *= 2
1631 for i in range(len(arr)):
1632 arr[i] *= 2
1633
1634 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001635 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001636 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001637 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001638 arr = self.Array('d', list(range(10)), lock=lock)
1639 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001640 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001641
1642 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1643 p.start()
1644 p.join()
1645
1646 self.assertEqual(x.value, 14)
1647 self.assertAlmostEqual(y.value, 2.0/3.0)
1648 self.assertEqual(foo.x, 6)
1649 self.assertAlmostEqual(foo.y, 4.0)
1650 for i in range(10):
1651 self.assertAlmostEqual(arr[i], i*2)
1652 self.assertEqual(string.value, latin('hellohello'))
1653
1654 def test_synchronize(self):
1655 self.test_sharedctypes(lock=True)
1656
1657 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001658 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001659 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001660 foo.x = 0
1661 foo.y = 0
1662 self.assertEqual(bar.x, 2)
1663 self.assertAlmostEqual(bar.y, 5.0)
1664
1665#
1666#
1667#
1668
1669class _TestFinalize(BaseTestCase):
1670
1671 ALLOWED_TYPES = ('processes',)
1672
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001673 @classmethod
1674 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001675 class Foo(object):
1676 pass
1677
1678 a = Foo()
1679 util.Finalize(a, conn.send, args=('a',))
1680 del a # triggers callback for a
1681
1682 b = Foo()
1683 close_b = util.Finalize(b, conn.send, args=('b',))
1684 close_b() # triggers callback for b
1685 close_b() # does nothing because callback has already been called
1686 del b # does nothing because callback has already been called
1687
1688 c = Foo()
1689 util.Finalize(c, conn.send, args=('c',))
1690
1691 d10 = Foo()
1692 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1693
1694 d01 = Foo()
1695 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1696 d02 = Foo()
1697 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1698 d03 = Foo()
1699 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1700
1701 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1702
1703 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1704
1705 # call mutliprocessing's cleanup function then exit process without
1706 # garbage collecting locals
1707 util._exit_function()
1708 conn.close()
1709 os._exit(0)
1710
1711 def test_finalize(self):
1712 conn, child_conn = self.Pipe()
1713
1714 p = self.Process(target=self._test_finalize, args=(child_conn,))
1715 p.start()
1716 p.join()
1717
1718 result = [obj for obj in iter(conn.recv, 'STOP')]
1719 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1720
1721#
1722# Test that from ... import * works for each module
1723#
1724
1725class _TestImportStar(BaseTestCase):
1726
1727 ALLOWED_TYPES = ('processes',)
1728
1729 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001730 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001731 'multiprocessing', 'multiprocessing.connection',
1732 'multiprocessing.heap', 'multiprocessing.managers',
1733 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001734 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001735 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001736 ]
1737
1738 if c_int is not None:
1739 # This module requires _ctypes
1740 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001741
1742 for name in modules:
1743 __import__(name)
1744 mod = sys.modules[name]
1745
1746 for attr in getattr(mod, '__all__', ()):
1747 self.assertTrue(
1748 hasattr(mod, attr),
1749 '%r does not have attribute %r' % (mod, attr)
1750 )
1751
1752#
1753# Quick test that logging works -- does not test logging output
1754#
1755
1756class _TestLogging(BaseTestCase):
1757
1758 ALLOWED_TYPES = ('processes',)
1759
1760 def test_enable_logging(self):
1761 logger = multiprocessing.get_logger()
1762 logger.setLevel(util.SUBWARNING)
1763 self.assertTrue(logger is not None)
1764 logger.debug('this will not be printed')
1765 logger.info('nor will this')
1766 logger.setLevel(LOG_LEVEL)
1767
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001768 @classmethod
1769 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001770 logger = multiprocessing.get_logger()
1771 conn.send(logger.getEffectiveLevel())
1772
1773 def test_level(self):
1774 LEVEL1 = 32
1775 LEVEL2 = 37
1776
1777 logger = multiprocessing.get_logger()
1778 root_logger = logging.getLogger()
1779 root_level = root_logger.level
1780
1781 reader, writer = multiprocessing.Pipe(duplex=False)
1782
1783 logger.setLevel(LEVEL1)
1784 self.Process(target=self._test_level, args=(writer,)).start()
1785 self.assertEqual(LEVEL1, reader.recv())
1786
1787 logger.setLevel(logging.NOTSET)
1788 root_logger.setLevel(LEVEL2)
1789 self.Process(target=self._test_level, args=(writer,)).start()
1790 self.assertEqual(LEVEL2, reader.recv())
1791
1792 root_logger.setLevel(root_level)
1793 logger.setLevel(level=LOG_LEVEL)
1794
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001795
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001796# class _TestLoggingProcessName(BaseTestCase):
1797#
1798# def handle(self, record):
1799# assert record.processName == multiprocessing.current_process().name
1800# self.__handled = True
1801#
1802# def test_logging(self):
1803# handler = logging.Handler()
1804# handler.handle = self.handle
1805# self.__handled = False
1806# # Bypass getLogger() and side-effects
1807# logger = logging.getLoggerClass()(
1808# 'multiprocessing.test.TestLoggingProcessName')
1809# logger.addHandler(handler)
1810# logger.propagate = False
1811#
1812# logger.warn('foo')
1813# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001814
Benjamin Petersone711caf2008-06-11 16:44:04 +00001815#
Jesse Noller6214edd2009-01-19 16:23:53 +00001816# Test to verify handle verification, see issue 3321
1817#
1818
1819class TestInvalidHandle(unittest.TestCase):
1820
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001821 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001822 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001823 conn = _multiprocessing.Connection(44977608)
1824 self.assertRaises(IOError, conn.poll)
1825 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001826
Jesse Noller6214edd2009-01-19 16:23:53 +00001827#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001828# Functions used to create test cases from the base ones in this module
1829#
1830
1831def get_attributes(Source, names):
1832 d = {}
1833 for name in names:
1834 obj = getattr(Source, name)
1835 if type(obj) == type(get_attributes):
1836 obj = staticmethod(obj)
1837 d[name] = obj
1838 return d
1839
1840def create_test_cases(Mixin, type):
1841 result = {}
1842 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001843 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001844
1845 for name in list(glob.keys()):
1846 if name.startswith('_Test'):
1847 base = glob[name]
1848 if type in base.ALLOWED_TYPES:
1849 newname = 'With' + Type + name[1:]
1850 class Temp(base, unittest.TestCase, Mixin):
1851 pass
1852 result[newname] = Temp
1853 Temp.__name__ = newname
1854 Temp.__module__ = Mixin.__module__
1855 return result
1856
1857#
1858# Create test cases
1859#
1860
1861class ProcessesMixin(object):
1862 TYPE = 'processes'
1863 Process = multiprocessing.Process
1864 locals().update(get_attributes(multiprocessing, (
1865 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1866 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1867 'RawArray', 'current_process', 'active_children', 'Pipe',
1868 'connection', 'JoinableQueue'
1869 )))
1870
1871testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1872globals().update(testcases_processes)
1873
1874
1875class ManagerMixin(object):
1876 TYPE = 'manager'
1877 Process = multiprocessing.Process
1878 manager = object.__new__(multiprocessing.managers.SyncManager)
1879 locals().update(get_attributes(manager, (
1880 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1881 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1882 'Namespace', 'JoinableQueue'
1883 )))
1884
1885testcases_manager = create_test_cases(ManagerMixin, type='manager')
1886globals().update(testcases_manager)
1887
1888
1889class ThreadsMixin(object):
1890 TYPE = 'threads'
1891 Process = multiprocessing.dummy.Process
1892 locals().update(get_attributes(multiprocessing.dummy, (
1893 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1894 'Condition', 'Event', 'Value', 'Array', 'current_process',
1895 'active_children', 'Pipe', 'connection', 'dict', 'list',
1896 'Namespace', 'JoinableQueue'
1897 )))
1898
1899testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1900globals().update(testcases_threads)
1901
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001902class OtherTest(unittest.TestCase):
1903 # TODO: add more tests for deliver/answer challenge.
1904 def test_deliver_challenge_auth_failure(self):
1905 class _FakeConnection(object):
1906 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001907 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001908 def send_bytes(self, data):
1909 pass
1910 self.assertRaises(multiprocessing.AuthenticationError,
1911 multiprocessing.connection.deliver_challenge,
1912 _FakeConnection(), b'abc')
1913
1914 def test_answer_challenge_auth_failure(self):
1915 class _FakeConnection(object):
1916 def __init__(self):
1917 self.count = 0
1918 def recv_bytes(self, size):
1919 self.count += 1
1920 if self.count == 1:
1921 return multiprocessing.connection.CHALLENGE
1922 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001923 return b'something bogus'
1924 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001925 def send_bytes(self, data):
1926 pass
1927 self.assertRaises(multiprocessing.AuthenticationError,
1928 multiprocessing.connection.answer_challenge,
1929 _FakeConnection(), b'abc')
1930
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001931#
1932# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1933#
1934
1935def initializer(ns):
1936 ns.test += 1
1937
1938class TestInitializers(unittest.TestCase):
1939 def setUp(self):
1940 self.mgr = multiprocessing.Manager()
1941 self.ns = self.mgr.Namespace()
1942 self.ns.test = 0
1943
1944 def tearDown(self):
1945 self.mgr.shutdown()
1946
1947 def test_manager_initializer(self):
1948 m = multiprocessing.managers.SyncManager()
1949 self.assertRaises(TypeError, m.start, 1)
1950 m.start(initializer, (self.ns,))
1951 self.assertEqual(self.ns.test, 1)
1952 m.shutdown()
1953
1954 def test_pool_initializer(self):
1955 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1956 p = multiprocessing.Pool(1, initializer, (self.ns,))
1957 p.close()
1958 p.join()
1959 self.assertEqual(self.ns.test, 1)
1960
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00001961#
1962# Issue 5155, 5313, 5331: Test process in processes
1963# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1964#
1965
1966def _ThisSubProcess(q):
1967 try:
1968 item = q.get(block=False)
1969 except pyqueue.Empty:
1970 pass
1971
1972def _TestProcess(q):
1973 queue = multiprocessing.Queue()
1974 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1975 subProc.start()
1976 subProc.join()
1977
1978def _afunc(x):
1979 return x*x
1980
1981def pool_in_process():
1982 pool = multiprocessing.Pool(processes=4)
1983 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1984
1985class _file_like(object):
1986 def __init__(self, delegate):
1987 self._delegate = delegate
1988 self._pid = None
1989
1990 @property
1991 def cache(self):
1992 pid = os.getpid()
1993 # There are no race conditions since fork keeps only the running thread
1994 if pid != self._pid:
1995 self._pid = pid
1996 self._cache = []
1997 return self._cache
1998
1999 def write(self, data):
2000 self.cache.append(data)
2001
2002 def flush(self):
2003 self._delegate.write(''.join(self.cache))
2004 self._cache = []
2005
2006class TestStdinBadfiledescriptor(unittest.TestCase):
2007
2008 def test_queue_in_process(self):
2009 queue = multiprocessing.Queue()
2010 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2011 proc.start()
2012 proc.join()
2013
2014 def test_pool_in_process(self):
2015 p = multiprocessing.Process(target=pool_in_process)
2016 p.start()
2017 p.join()
2018
2019 def test_flushing(self):
2020 sio = io.StringIO()
2021 flike = _file_like(sio)
2022 flike.write('foo')
2023 proc = multiprocessing.Process(target=lambda: flike.flush())
2024 flike.flush()
2025 assert sio.getvalue() == 'foo'
2026
2027testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2028 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002029
Benjamin Petersone711caf2008-06-11 16:44:04 +00002030#
2031#
2032#
2033
2034def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002035 if sys.platform.startswith("linux"):
2036 try:
2037 lock = multiprocessing.RLock()
2038 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002039 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002040
Benjamin Petersone711caf2008-06-11 16:44:04 +00002041 if run is None:
2042 from test.support import run_unittest as run
2043
2044 util.get_temp_dir() # creates temp directory for use by all processes
2045
2046 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2047
Benjamin Peterson41181742008-07-02 20:22:54 +00002048 ProcessesMixin.pool = multiprocessing.Pool(4)
2049 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2050 ManagerMixin.manager.__init__()
2051 ManagerMixin.manager.start()
2052 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002053
2054 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002055 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2056 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002057 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2058 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002059 )
2060
2061 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2062 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2063 run(suite)
2064
Benjamin Peterson41181742008-07-02 20:22:54 +00002065 ThreadsMixin.pool.terminate()
2066 ProcessesMixin.pool.terminate()
2067 ManagerMixin.pool.terminate()
2068 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002069
Benjamin Peterson41181742008-07-02 20:22:54 +00002070 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002071
2072def main():
2073 test_main(unittest.TextTestRunner(verbosity=2).run)
2074
2075if __name__ == '__main__':
2076 main()