blob: 6da4b177727f73689dd01cdd3be4fb9cd82c6e4a [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
Brian Curtina06e9b82010-10-07 02:27:41 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44#
45#
46
Benjamin Petersone79edf52008-07-13 18:34:58 +000047latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000054#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000055
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersondfd79492008-06-13 19:13:39 +000071#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000072# Some tests require ctypes
73#
74
75try:
Nick Coghlan13623662010-04-10 14:24:36 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersondfd79492008-06-13 19:13:39 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersondfd79492008-06-13 19:13:39 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155
156 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000158 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 q.put(args)
167 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000169 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000182 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000186 self.assertEqual(p.authkey, current.authkey)
187 self.assertEqual(p.is_alive(), False)
188 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000189 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192
193 p.start()
194
Ezio Melotti2623a372010-11-21 13:34:58 +0000195 self.assertEqual(p.exitcode, None)
196 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000197 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198
Ezio Melotti2623a372010-11-21 13:34:58 +0000199 self.assertEqual(q.get(), args[1:])
200 self.assertEqual(q.get(), kwargs)
201 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000203 self.assertEqual(q.get(), current.authkey)
204 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205
206 p.join()
207
Ezio Melotti2623a372010-11-21 13:34:58 +0000208 self.assertEqual(p.exitcode, 0)
209 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000211
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000225 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000235 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 p.join()
238
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255
256 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000260 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000369 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(Queue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(Queue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(Queue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000420 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000436 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(Queue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(Queue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(Queue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(Queue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in xrange(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in xrange(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Noller82eb5902009-03-30 23:29:31 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000666 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000670 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000712 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000717 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in xrange(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in xrange(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000735 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000739 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in xrange(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
Ezio Melottic2077b02011-03-16 12:34:31 +0200783 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000784 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000809 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000810
811#
812#
813#
814
815class _TestValue(BaseTestCase):
816
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000817 ALLOWED_TYPES = ('processes',)
818
Benjamin Petersondfd79492008-06-13 19:13:39 +0000819 codes_values = [
820 ('i', 4343, 24234),
821 ('d', 3.625, -4.25),
822 ('h', -232, 234),
823 ('c', latin('x'), latin('y'))
824 ]
825
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000826 def setUp(self):
827 if not HAS_SHAREDCTYPES:
828 self.skipTest("requires multiprocessing.sharedctypes")
829
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000830 @classmethod
831 def _test(cls, values):
832 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 sv.value = cv[2]
834
835
836 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000837 if raw:
838 values = [self.RawValue(code, value)
839 for code, value, _ in self.codes_values]
840 else:
841 values = [self.Value(code, value)
842 for code, value, _ in self.codes_values]
843
844 for sv, cv in zip(values, self.codes_values):
845 self.assertEqual(sv.value, cv[1])
846
847 proc = self.Process(target=self._test, args=(values,))
848 proc.start()
849 proc.join()
850
851 for sv, cv in zip(values, self.codes_values):
852 self.assertEqual(sv.value, cv[2])
853
854 def test_rawvalue(self):
855 self.test_value(raw=True)
856
857 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 val1 = self.Value('i', 5)
859 lock1 = val1.get_lock()
860 obj1 = val1.get_obj()
861
862 val2 = self.Value('i', 5, lock=None)
863 lock2 = val2.get_lock()
864 obj2 = val2.get_obj()
865
866 lock = self.Lock()
867 val3 = self.Value('i', 5, lock=lock)
868 lock3 = val3.get_lock()
869 obj3 = val3.get_obj()
870 self.assertEqual(lock, lock3)
871
Jesse Noller6ab22152009-01-18 02:45:38 +0000872 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873 self.assertFalse(hasattr(arr4, 'get_lock'))
874 self.assertFalse(hasattr(arr4, 'get_obj'))
875
Jesse Noller6ab22152009-01-18 02:45:38 +0000876 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
877
878 arr5 = self.RawValue('i', 5)
879 self.assertFalse(hasattr(arr5, 'get_lock'))
880 self.assertFalse(hasattr(arr5, 'get_obj'))
881
Benjamin Petersondfd79492008-06-13 19:13:39 +0000882
883class _TestArray(BaseTestCase):
884
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000885 ALLOWED_TYPES = ('processes',)
886
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000887 @classmethod
888 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 for i in range(1, len(seq)):
890 seq[i] += seq[i-1]
891
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000893 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000894 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
895 if raw:
896 arr = self.RawArray('i', seq)
897 else:
898 arr = self.Array('i', seq)
899
900 self.assertEqual(len(arr), len(seq))
901 self.assertEqual(arr[3], seq[3])
902 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
903
904 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
905
906 self.assertEqual(list(arr[:]), seq)
907
908 self.f(seq)
909
910 p = self.Process(target=self.f, args=(arr,))
911 p.start()
912 p.join()
913
914 self.assertEqual(list(arr[:]), seq)
915
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000917 def test_array_from_size(self):
918 size = 10
919 # Test for zeroing (see issue #11675).
920 # The repetition below strengthens the test by increasing the chances
921 # of previously allocated non-zero memory being used for the new array
922 # on the 2nd and 3rd loops.
923 for _ in range(3):
924 arr = self.Array('i', size)
925 self.assertEqual(len(arr), size)
926 self.assertEqual(list(arr), [0] * size)
927 arr[:] = range(10)
928 self.assertEqual(list(arr), range(10))
929 del arr
930
931 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000932 def test_rawarray(self):
933 self.test_array(raw=True)
934
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000935 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000936 def test_array_accepts_long(self):
937 arr = self.Array('i', 10L)
938 self.assertEqual(len(arr), 10)
939 raw_arr = self.RawArray('i', 10L)
940 self.assertEqual(len(raw_arr), 10)
941
942 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000943 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 arr1 = self.Array('i', range(10))
945 lock1 = arr1.get_lock()
946 obj1 = arr1.get_obj()
947
948 arr2 = self.Array('i', range(10), lock=None)
949 lock2 = arr2.get_lock()
950 obj2 = arr2.get_obj()
951
952 lock = self.Lock()
953 arr3 = self.Array('i', range(10), lock=lock)
954 lock3 = arr3.get_lock()
955 obj3 = arr3.get_obj()
956 self.assertEqual(lock, lock3)
957
Jesse Noller6ab22152009-01-18 02:45:38 +0000958 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000959 self.assertFalse(hasattr(arr4, 'get_lock'))
960 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000961 self.assertRaises(AttributeError,
962 self.Array, 'i', range(10), lock='notalock')
963
964 arr5 = self.RawArray('i', range(10))
965 self.assertFalse(hasattr(arr5, 'get_lock'))
966 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000967
968#
969#
970#
971
972class _TestContainers(BaseTestCase):
973
974 ALLOWED_TYPES = ('manager',)
975
976 def test_list(self):
977 a = self.list(range(10))
978 self.assertEqual(a[:], range(10))
979
980 b = self.list()
981 self.assertEqual(b[:], [])
982
983 b.extend(range(5))
984 self.assertEqual(b[:], range(5))
985
986 self.assertEqual(b[2], 2)
987 self.assertEqual(b[2:10], [2,3,4])
988
989 b *= 2
990 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
991
992 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
993
994 self.assertEqual(a[:], range(10))
995
996 d = [a, b]
997 e = self.list(d)
998 self.assertEqual(
999 e[:],
1000 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1001 )
1002
1003 f = self.list([a])
1004 a.append('hello')
1005 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1006
1007 def test_dict(self):
1008 d = self.dict()
1009 indices = range(65, 70)
1010 for i in indices:
1011 d[i] = chr(i)
1012 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1013 self.assertEqual(sorted(d.keys()), indices)
1014 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1015 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1016
1017 def test_namespace(self):
1018 n = self.Namespace()
1019 n.name = 'Bob'
1020 n.job = 'Builder'
1021 n._hidden = 'hidden'
1022 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1023 del n.job
1024 self.assertEqual(str(n), "Namespace(name='Bob')")
1025 self.assertTrue(hasattr(n, 'name'))
1026 self.assertTrue(not hasattr(n, 'job'))
1027
1028#
1029#
1030#
1031
1032def sqr(x, wait=0.0):
1033 time.sleep(wait)
1034 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001035class _TestPool(BaseTestCase):
1036
1037 def test_apply(self):
1038 papply = self.pool.apply
1039 self.assertEqual(papply(sqr, (5,)), sqr(5))
1040 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1041
1042 def test_map(self):
1043 pmap = self.pool.map
1044 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1045 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1046 map(sqr, range(100)))
1047
Jesse Noller7530e472009-07-16 14:23:04 +00001048 def test_map_chunksize(self):
1049 try:
1050 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1051 except multiprocessing.TimeoutError:
1052 self.fail("pool.map_async with chunksize stalled on null list")
1053
Benjamin Petersondfd79492008-06-13 19:13:39 +00001054 def test_async(self):
1055 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1056 get = TimingWrapper(res.get)
1057 self.assertEqual(get(), 49)
1058 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1059
1060 def test_async_timeout(self):
1061 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1062 get = TimingWrapper(res.get)
1063 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1064 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1065
1066 def test_imap(self):
1067 it = self.pool.imap(sqr, range(10))
1068 self.assertEqual(list(it), map(sqr, range(10)))
1069
1070 it = self.pool.imap(sqr, range(10))
1071 for i in range(10):
1072 self.assertEqual(it.next(), i*i)
1073 self.assertRaises(StopIteration, it.next)
1074
1075 it = self.pool.imap(sqr, range(1000), chunksize=100)
1076 for i in range(1000):
1077 self.assertEqual(it.next(), i*i)
1078 self.assertRaises(StopIteration, it.next)
1079
1080 def test_imap_unordered(self):
1081 it = self.pool.imap_unordered(sqr, range(1000))
1082 self.assertEqual(sorted(it), map(sqr, range(1000)))
1083
1084 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1085 self.assertEqual(sorted(it), map(sqr, range(1000)))
1086
1087 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001088 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1089 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1090
Benjamin Petersondfd79492008-06-13 19:13:39 +00001091 p = multiprocessing.Pool(3)
1092 self.assertEqual(3, len(p._pool))
1093 p.close()
1094 p.join()
1095
1096 def test_terminate(self):
1097 if self.TYPE == 'manager':
1098 # On Unix a forked process increfs each shared object to
1099 # which its parent process held a reference. If the
1100 # forked process gets terminated then there is likely to
1101 # be a reference leak. So to prevent
1102 # _TestZZZNumberOfObjects from failing we skip this test
1103 # when using a manager.
1104 return
1105
1106 result = self.pool.map_async(
1107 time.sleep, [0.1 for i in range(10000)], chunksize=1
1108 )
1109 self.pool.terminate()
1110 join = TimingWrapper(self.pool.join)
1111 join()
1112 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001113
1114class _TestPoolWorkerLifetime(BaseTestCase):
1115
1116 ALLOWED_TYPES = ('processes', )
1117 def test_pool_worker_lifetime(self):
1118 p = multiprocessing.Pool(3, maxtasksperchild=10)
1119 self.assertEqual(3, len(p._pool))
1120 origworkerpids = [w.pid for w in p._pool]
1121 # Run many tasks so each worker gets replaced (hopefully)
1122 results = []
1123 for i in range(100):
1124 results.append(p.apply_async(sqr, (i, )))
1125 # Fetch the results and verify we got the right answers,
1126 # also ensuring all the tasks have completed.
1127 for (j, res) in enumerate(results):
1128 self.assertEqual(res.get(), sqr(j))
1129 # Refill the pool
1130 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001131 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001132 # (countdown * DELTA = 5 seconds max startup process time)
1133 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001134 while countdown and not all(w.is_alive() for w in p._pool):
1135 countdown -= 1
1136 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001137 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001138 # All pids should be assigned. See issue #7805.
1139 self.assertNotIn(None, origworkerpids)
1140 self.assertNotIn(None, finalworkerpids)
1141 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001142 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1143 p.close()
1144 p.join()
1145
Benjamin Petersondfd79492008-06-13 19:13:39 +00001146#
1147# Test that manager has expected number of shared objects left
1148#
1149
1150class _TestZZZNumberOfObjects(BaseTestCase):
1151 # Because test cases are sorted alphabetically, this one will get
1152 # run after all the other tests for the manager. It tests that
1153 # there have been no "reference leaks" for the manager's shared
1154 # objects. Note the comment in _TestPool.test_terminate().
1155 ALLOWED_TYPES = ('manager',)
1156
1157 def test_number_of_objects(self):
1158 EXPECTED_NUMBER = 1 # the pool object is still alive
1159 multiprocessing.active_children() # discard dead process objs
1160 gc.collect() # do garbage collection
1161 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001162 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001163 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001164 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001165 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001166
1167 self.assertEqual(refs, EXPECTED_NUMBER)
1168
1169#
1170# Test of creating a customized manager class
1171#
1172
1173from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1174
1175class FooBar(object):
1176 def f(self):
1177 return 'f()'
1178 def g(self):
1179 raise ValueError
1180 def _h(self):
1181 return '_h()'
1182
1183def baz():
1184 for i in xrange(10):
1185 yield i*i
1186
1187class IteratorProxy(BaseProxy):
1188 _exposed_ = ('next', '__next__')
1189 def __iter__(self):
1190 return self
1191 def next(self):
1192 return self._callmethod('next')
1193 def __next__(self):
1194 return self._callmethod('__next__')
1195
1196class MyManager(BaseManager):
1197 pass
1198
1199MyManager.register('Foo', callable=FooBar)
1200MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1201MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1202
1203
1204class _TestMyManager(BaseTestCase):
1205
1206 ALLOWED_TYPES = ('manager',)
1207
1208 def test_mymanager(self):
1209 manager = MyManager()
1210 manager.start()
1211
1212 foo = manager.Foo()
1213 bar = manager.Bar()
1214 baz = manager.baz()
1215
1216 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1217 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1218
1219 self.assertEqual(foo_methods, ['f', 'g'])
1220 self.assertEqual(bar_methods, ['f', '_h'])
1221
1222 self.assertEqual(foo.f(), 'f()')
1223 self.assertRaises(ValueError, foo.g)
1224 self.assertEqual(foo._callmethod('f'), 'f()')
1225 self.assertRaises(RemoteError, foo._callmethod, '_h')
1226
1227 self.assertEqual(bar.f(), 'f()')
1228 self.assertEqual(bar._h(), '_h()')
1229 self.assertEqual(bar._callmethod('f'), 'f()')
1230 self.assertEqual(bar._callmethod('_h'), '_h()')
1231
1232 self.assertEqual(list(baz), [i*i for i in range(10)])
1233
1234 manager.shutdown()
1235
1236#
1237# Test of connecting to a remote server and using xmlrpclib for serialization
1238#
1239
1240_queue = Queue.Queue()
1241def get_queue():
1242 return _queue
1243
1244class QueueManager(BaseManager):
1245 '''manager class used by server process'''
1246QueueManager.register('get_queue', callable=get_queue)
1247
1248class QueueManager2(BaseManager):
1249 '''manager class which specifies the same interface as QueueManager'''
1250QueueManager2.register('get_queue')
1251
1252
1253SERIALIZER = 'xmlrpclib'
1254
1255class _TestRemoteManager(BaseTestCase):
1256
1257 ALLOWED_TYPES = ('manager',)
1258
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001259 @classmethod
1260 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001261 manager = QueueManager2(
1262 address=address, authkey=authkey, serializer=SERIALIZER
1263 )
1264 manager.connect()
1265 queue = manager.get_queue()
1266 queue.put(('hello world', None, True, 2.25))
1267
1268 def test_remote(self):
1269 authkey = os.urandom(32)
1270
1271 manager = QueueManager(
1272 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1273 )
1274 manager.start()
1275
1276 p = self.Process(target=self._putter, args=(manager.address, authkey))
1277 p.start()
1278
1279 manager2 = QueueManager2(
1280 address=manager.address, authkey=authkey, serializer=SERIALIZER
1281 )
1282 manager2.connect()
1283 queue = manager2.get_queue()
1284
1285 # Note that xmlrpclib will deserialize object as a list not a tuple
1286 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1287
1288 # Because we are using xmlrpclib for serialization instead of
1289 # pickle this will cause a serialization error.
1290 self.assertRaises(Exception, queue.put, time.sleep)
1291
1292 # Make queue finalizer run before the server is stopped
1293 del queue
1294 manager.shutdown()
1295
Jesse Noller459a6482009-03-30 15:50:42 +00001296class _TestManagerRestart(BaseTestCase):
1297
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001298 @classmethod
1299 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001300 manager = QueueManager(
1301 address=address, authkey=authkey, serializer=SERIALIZER)
1302 manager.connect()
1303 queue = manager.get_queue()
1304 queue.put('hello world')
1305
1306 def test_rapid_restart(self):
1307 authkey = os.urandom(32)
1308 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001309 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001310 srvr = manager.get_server()
1311 addr = srvr.address
1312 # Close the connection.Listener socket which gets opened as a part
1313 # of manager.get_server(). It's not needed for the test.
1314 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001315 manager.start()
1316
1317 p = self.Process(target=self._putter, args=(manager.address, authkey))
1318 p.start()
1319 queue = manager.get_queue()
1320 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001321 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001322 manager.shutdown()
1323 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001324 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001325 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001326 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001327
Benjamin Petersondfd79492008-06-13 19:13:39 +00001328#
1329#
1330#
1331
1332SENTINEL = latin('')
1333
1334class _TestConnection(BaseTestCase):
1335
1336 ALLOWED_TYPES = ('processes', 'threads')
1337
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001338 @classmethod
1339 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001340 for msg in iter(conn.recv_bytes, SENTINEL):
1341 conn.send_bytes(msg)
1342 conn.close()
1343
1344 def test_connection(self):
1345 conn, child_conn = self.Pipe()
1346
1347 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001348 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001349 p.start()
1350
1351 seq = [1, 2.25, None]
1352 msg = latin('hello world')
1353 longmsg = msg * 10
1354 arr = array.array('i', range(4))
1355
1356 if self.TYPE == 'processes':
1357 self.assertEqual(type(conn.fileno()), int)
1358
1359 self.assertEqual(conn.send(seq), None)
1360 self.assertEqual(conn.recv(), seq)
1361
1362 self.assertEqual(conn.send_bytes(msg), None)
1363 self.assertEqual(conn.recv_bytes(), msg)
1364
1365 if self.TYPE == 'processes':
1366 buffer = array.array('i', [0]*10)
1367 expected = list(arr) + [0] * (10 - len(arr))
1368 self.assertEqual(conn.send_bytes(arr), None)
1369 self.assertEqual(conn.recv_bytes_into(buffer),
1370 len(arr) * buffer.itemsize)
1371 self.assertEqual(list(buffer), expected)
1372
1373 buffer = array.array('i', [0]*10)
1374 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1375 self.assertEqual(conn.send_bytes(arr), None)
1376 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1377 len(arr) * buffer.itemsize)
1378 self.assertEqual(list(buffer), expected)
1379
1380 buffer = bytearray(latin(' ' * 40))
1381 self.assertEqual(conn.send_bytes(longmsg), None)
1382 try:
1383 res = conn.recv_bytes_into(buffer)
1384 except multiprocessing.BufferTooShort, e:
1385 self.assertEqual(e.args, (longmsg,))
1386 else:
1387 self.fail('expected BufferTooShort, got %s' % res)
1388
1389 poll = TimingWrapper(conn.poll)
1390
1391 self.assertEqual(poll(), False)
1392 self.assertTimingAlmostEqual(poll.elapsed, 0)
1393
1394 self.assertEqual(poll(TIMEOUT1), False)
1395 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1396
1397 conn.send(None)
1398
1399 self.assertEqual(poll(TIMEOUT1), True)
1400 self.assertTimingAlmostEqual(poll.elapsed, 0)
1401
1402 self.assertEqual(conn.recv(), None)
1403
1404 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1405 conn.send_bytes(really_big_msg)
1406 self.assertEqual(conn.recv_bytes(), really_big_msg)
1407
1408 conn.send_bytes(SENTINEL) # tell child to quit
1409 child_conn.close()
1410
1411 if self.TYPE == 'processes':
1412 self.assertEqual(conn.readable, True)
1413 self.assertEqual(conn.writable, True)
1414 self.assertRaises(EOFError, conn.recv)
1415 self.assertRaises(EOFError, conn.recv_bytes)
1416
1417 p.join()
1418
1419 def test_duplex_false(self):
1420 reader, writer = self.Pipe(duplex=False)
1421 self.assertEqual(writer.send(1), None)
1422 self.assertEqual(reader.recv(), 1)
1423 if self.TYPE == 'processes':
1424 self.assertEqual(reader.readable, True)
1425 self.assertEqual(reader.writable, False)
1426 self.assertEqual(writer.readable, False)
1427 self.assertEqual(writer.writable, True)
1428 self.assertRaises(IOError, reader.send, 2)
1429 self.assertRaises(IOError, writer.recv)
1430 self.assertRaises(IOError, writer.poll)
1431
1432 def test_spawn_close(self):
1433 # We test that a pipe connection can be closed by parent
1434 # process immediately after child is spawned. On Windows this
1435 # would have sometimes failed on old versions because
1436 # child_conn would be closed before the child got a chance to
1437 # duplicate it.
1438 conn, child_conn = self.Pipe()
1439
1440 p = self.Process(target=self._echo, args=(child_conn,))
1441 p.start()
1442 child_conn.close() # this might complete before child initializes
1443
1444 msg = latin('hello')
1445 conn.send_bytes(msg)
1446 self.assertEqual(conn.recv_bytes(), msg)
1447
1448 conn.send_bytes(SENTINEL)
1449 conn.close()
1450 p.join()
1451
1452 def test_sendbytes(self):
1453 if self.TYPE != 'processes':
1454 return
1455
1456 msg = latin('abcdefghijklmnopqrstuvwxyz')
1457 a, b = self.Pipe()
1458
1459 a.send_bytes(msg)
1460 self.assertEqual(b.recv_bytes(), msg)
1461
1462 a.send_bytes(msg, 5)
1463 self.assertEqual(b.recv_bytes(), msg[5:])
1464
1465 a.send_bytes(msg, 7, 8)
1466 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1467
1468 a.send_bytes(msg, 26)
1469 self.assertEqual(b.recv_bytes(), latin(''))
1470
1471 a.send_bytes(msg, 26, 0)
1472 self.assertEqual(b.recv_bytes(), latin(''))
1473
1474 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1475
1476 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1477
1478 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1479
1480 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1481
1482 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1483
Benjamin Petersondfd79492008-06-13 19:13:39 +00001484class _TestListenerClient(BaseTestCase):
1485
1486 ALLOWED_TYPES = ('processes', 'threads')
1487
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001488 @classmethod
1489 def _test(cls, address):
1490 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001491 conn.send('hello')
1492 conn.close()
1493
1494 def test_listener_client(self):
1495 for family in self.connection.families:
1496 l = self.connection.Listener(family=family)
1497 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001498 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001499 p.start()
1500 conn = l.accept()
1501 self.assertEqual(conn.recv(), 'hello')
1502 p.join()
1503 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001504#
1505# Test of sending connection and socket objects between processes
1506#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001507"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001508class _TestPicklingConnections(BaseTestCase):
1509
1510 ALLOWED_TYPES = ('processes',)
1511
1512 def _listener(self, conn, families):
1513 for fam in families:
1514 l = self.connection.Listener(family=fam)
1515 conn.send(l.address)
1516 new_conn = l.accept()
1517 conn.send(new_conn)
1518
1519 if self.TYPE == 'processes':
1520 l = socket.socket()
1521 l.bind(('localhost', 0))
1522 conn.send(l.getsockname())
1523 l.listen(1)
1524 new_conn, addr = l.accept()
1525 conn.send(new_conn)
1526
1527 conn.recv()
1528
1529 def _remote(self, conn):
1530 for (address, msg) in iter(conn.recv, None):
1531 client = self.connection.Client(address)
1532 client.send(msg.upper())
1533 client.close()
1534
1535 if self.TYPE == 'processes':
1536 address, msg = conn.recv()
1537 client = socket.socket()
1538 client.connect(address)
1539 client.sendall(msg.upper())
1540 client.close()
1541
1542 conn.close()
1543
1544 def test_pickling(self):
1545 try:
1546 multiprocessing.allow_connection_pickling()
1547 except ImportError:
1548 return
1549
1550 families = self.connection.families
1551
1552 lconn, lconn0 = self.Pipe()
1553 lp = self.Process(target=self._listener, args=(lconn0, families))
1554 lp.start()
1555 lconn0.close()
1556
1557 rconn, rconn0 = self.Pipe()
1558 rp = self.Process(target=self._remote, args=(rconn0,))
1559 rp.start()
1560 rconn0.close()
1561
1562 for fam in families:
1563 msg = ('This connection uses family %s' % fam).encode('ascii')
1564 address = lconn.recv()
1565 rconn.send((address, msg))
1566 new_conn = lconn.recv()
1567 self.assertEqual(new_conn.recv(), msg.upper())
1568
1569 rconn.send(None)
1570
1571 if self.TYPE == 'processes':
1572 msg = latin('This connection uses a normal socket')
1573 address = lconn.recv()
1574 rconn.send((address, msg))
1575 if hasattr(socket, 'fromfd'):
1576 new_conn = lconn.recv()
1577 self.assertEqual(new_conn.recv(100), msg.upper())
1578 else:
1579 # XXX On Windows with Py2.6 need to backport fromfd()
1580 discard = lconn.recv_bytes()
1581
1582 lconn.send(None)
1583
1584 rconn.close()
1585 lconn.close()
1586
1587 lp.join()
1588 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001589"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001590#
1591#
1592#
1593
1594class _TestHeap(BaseTestCase):
1595
1596 ALLOWED_TYPES = ('processes',)
1597
1598 def test_heap(self):
1599 iterations = 5000
1600 maxblocks = 50
1601 blocks = []
1602
1603 # create and destroy lots of blocks of different sizes
1604 for i in xrange(iterations):
1605 size = int(random.lognormvariate(0, 1) * 1000)
1606 b = multiprocessing.heap.BufferWrapper(size)
1607 blocks.append(b)
1608 if len(blocks) > maxblocks:
1609 i = random.randrange(maxblocks)
1610 del blocks[i]
1611
1612 # get the heap object
1613 heap = multiprocessing.heap.BufferWrapper._heap
1614
1615 # verify the state of the heap
1616 all = []
1617 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001618 heap._lock.acquire()
1619 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001620 for L in heap._len_to_seq.values():
1621 for arena, start, stop in L:
1622 all.append((heap._arenas.index(arena), start, stop,
1623 stop-start, 'free'))
1624 for arena, start, stop in heap._allocated_blocks:
1625 all.append((heap._arenas.index(arena), start, stop,
1626 stop-start, 'occupied'))
1627 occupied += (stop-start)
1628
1629 all.sort()
1630
1631 for i in range(len(all)-1):
1632 (arena, start, stop) = all[i][:3]
1633 (narena, nstart, nstop) = all[i+1][:3]
1634 self.assertTrue((arena != narena and nstart == 0) or
1635 (stop == nstart))
1636
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001637 def test_free_from_gc(self):
1638 # Check that freeing of blocks by the garbage collector doesn't deadlock
1639 # (issue #12352).
1640 # Make sure the GC is enabled, and set lower collection thresholds to
1641 # make collections more frequent (and increase the probability of
1642 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001643 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001644 gc.enable()
1645 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001646 thresholds = gc.get_threshold()
1647 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001648 gc.set_threshold(10)
1649
1650 # perform numerous block allocations, with cyclic references to make
1651 # sure objects are collected asynchronously by the gc
1652 for i in range(5000):
1653 a = multiprocessing.heap.BufferWrapper(1)
1654 b = multiprocessing.heap.BufferWrapper(1)
1655 # circular references
1656 a.buddy = b
1657 b.buddy = a
1658
Benjamin Petersondfd79492008-06-13 19:13:39 +00001659#
1660#
1661#
1662
Benjamin Petersondfd79492008-06-13 19:13:39 +00001663class _Foo(Structure):
1664 _fields_ = [
1665 ('x', c_int),
1666 ('y', c_double)
1667 ]
1668
1669class _TestSharedCTypes(BaseTestCase):
1670
1671 ALLOWED_TYPES = ('processes',)
1672
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001673 def setUp(self):
1674 if not HAS_SHAREDCTYPES:
1675 self.skipTest("requires multiprocessing.sharedctypes")
1676
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001677 @classmethod
1678 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001679 x.value *= 2
1680 y.value *= 2
1681 foo.x *= 2
1682 foo.y *= 2
1683 string.value *= 2
1684 for i in range(len(arr)):
1685 arr[i] *= 2
1686
1687 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001688 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001689 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001690 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001691 arr = self.Array('d', range(10), lock=lock)
1692 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001693 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001694
1695 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1696 p.start()
1697 p.join()
1698
1699 self.assertEqual(x.value, 14)
1700 self.assertAlmostEqual(y.value, 2.0/3.0)
1701 self.assertEqual(foo.x, 6)
1702 self.assertAlmostEqual(foo.y, 4.0)
1703 for i in range(10):
1704 self.assertAlmostEqual(arr[i], i*2)
1705 self.assertEqual(string.value, latin('hellohello'))
1706
1707 def test_synchronize(self):
1708 self.test_sharedctypes(lock=True)
1709
1710 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001711 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001712 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001713 foo.x = 0
1714 foo.y = 0
1715 self.assertEqual(bar.x, 2)
1716 self.assertAlmostEqual(bar.y, 5.0)
1717
1718#
1719#
1720#
1721
1722class _TestFinalize(BaseTestCase):
1723
1724 ALLOWED_TYPES = ('processes',)
1725
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001726 @classmethod
1727 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001728 class Foo(object):
1729 pass
1730
1731 a = Foo()
1732 util.Finalize(a, conn.send, args=('a',))
1733 del a # triggers callback for a
1734
1735 b = Foo()
1736 close_b = util.Finalize(b, conn.send, args=('b',))
1737 close_b() # triggers callback for b
1738 close_b() # does nothing because callback has already been called
1739 del b # does nothing because callback has already been called
1740
1741 c = Foo()
1742 util.Finalize(c, conn.send, args=('c',))
1743
1744 d10 = Foo()
1745 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1746
1747 d01 = Foo()
1748 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1749 d02 = Foo()
1750 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1751 d03 = Foo()
1752 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1753
1754 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1755
1756 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1757
Ezio Melottic2077b02011-03-16 12:34:31 +02001758 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001759 # garbage collecting locals
1760 util._exit_function()
1761 conn.close()
1762 os._exit(0)
1763
1764 def test_finalize(self):
1765 conn, child_conn = self.Pipe()
1766
1767 p = self.Process(target=self._test_finalize, args=(child_conn,))
1768 p.start()
1769 p.join()
1770
1771 result = [obj for obj in iter(conn.recv, 'STOP')]
1772 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1773
1774#
1775# Test that from ... import * works for each module
1776#
1777
1778class _TestImportStar(BaseTestCase):
1779
1780 ALLOWED_TYPES = ('processes',)
1781
1782 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001783 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001784 'multiprocessing', 'multiprocessing.connection',
1785 'multiprocessing.heap', 'multiprocessing.managers',
1786 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001787 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001788 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001789 ]
1790
1791 if c_int is not None:
1792 # This module requires _ctypes
1793 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001794
1795 for name in modules:
1796 __import__(name)
1797 mod = sys.modules[name]
1798
1799 for attr in getattr(mod, '__all__', ()):
1800 self.assertTrue(
1801 hasattr(mod, attr),
1802 '%r does not have attribute %r' % (mod, attr)
1803 )
1804
1805#
1806# Quick test that logging works -- does not test logging output
1807#
1808
1809class _TestLogging(BaseTestCase):
1810
1811 ALLOWED_TYPES = ('processes',)
1812
1813 def test_enable_logging(self):
1814 logger = multiprocessing.get_logger()
1815 logger.setLevel(util.SUBWARNING)
1816 self.assertTrue(logger is not None)
1817 logger.debug('this will not be printed')
1818 logger.info('nor will this')
1819 logger.setLevel(LOG_LEVEL)
1820
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001821 @classmethod
1822 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001823 logger = multiprocessing.get_logger()
1824 conn.send(logger.getEffectiveLevel())
1825
1826 def test_level(self):
1827 LEVEL1 = 32
1828 LEVEL2 = 37
1829
1830 logger = multiprocessing.get_logger()
1831 root_logger = logging.getLogger()
1832 root_level = root_logger.level
1833
1834 reader, writer = multiprocessing.Pipe(duplex=False)
1835
1836 logger.setLevel(LEVEL1)
1837 self.Process(target=self._test_level, args=(writer,)).start()
1838 self.assertEqual(LEVEL1, reader.recv())
1839
1840 logger.setLevel(logging.NOTSET)
1841 root_logger.setLevel(LEVEL2)
1842 self.Process(target=self._test_level, args=(writer,)).start()
1843 self.assertEqual(LEVEL2, reader.recv())
1844
1845 root_logger.setLevel(root_level)
1846 logger.setLevel(level=LOG_LEVEL)
1847
Jesse Noller814d02d2009-11-21 14:38:23 +00001848
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001849# class _TestLoggingProcessName(BaseTestCase):
1850#
1851# def handle(self, record):
1852# assert record.processName == multiprocessing.current_process().name
1853# self.__handled = True
1854#
1855# def test_logging(self):
1856# handler = logging.Handler()
1857# handler.handle = self.handle
1858# self.__handled = False
1859# # Bypass getLogger() and side-effects
1860# logger = logging.getLoggerClass()(
1861# 'multiprocessing.test.TestLoggingProcessName')
1862# logger.addHandler(handler)
1863# logger.propagate = False
1864#
1865# logger.warn('foo')
1866# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001867
Benjamin Petersondfd79492008-06-13 19:13:39 +00001868#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001869# Test to verify handle verification, see issue 3321
1870#
1871
1872class TestInvalidHandle(unittest.TestCase):
1873
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001874 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001875 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001876 conn = _multiprocessing.Connection(44977608)
1877 self.assertRaises(IOError, conn.poll)
1878 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001879
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001880#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001881# Functions used to create test cases from the base ones in this module
1882#
1883
1884def get_attributes(Source, names):
1885 d = {}
1886 for name in names:
1887 obj = getattr(Source, name)
1888 if type(obj) == type(get_attributes):
1889 obj = staticmethod(obj)
1890 d[name] = obj
1891 return d
1892
1893def create_test_cases(Mixin, type):
1894 result = {}
1895 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001896 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001897
1898 for name in glob.keys():
1899 if name.startswith('_Test'):
1900 base = glob[name]
1901 if type in base.ALLOWED_TYPES:
1902 newname = 'With' + Type + name[1:]
1903 class Temp(base, unittest.TestCase, Mixin):
1904 pass
1905 result[newname] = Temp
1906 Temp.__name__ = newname
1907 Temp.__module__ = Mixin.__module__
1908 return result
1909
1910#
1911# Create test cases
1912#
1913
1914class ProcessesMixin(object):
1915 TYPE = 'processes'
1916 Process = multiprocessing.Process
1917 locals().update(get_attributes(multiprocessing, (
1918 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1919 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1920 'RawArray', 'current_process', 'active_children', 'Pipe',
1921 'connection', 'JoinableQueue'
1922 )))
1923
1924testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1925globals().update(testcases_processes)
1926
1927
1928class ManagerMixin(object):
1929 TYPE = 'manager'
1930 Process = multiprocessing.Process
1931 manager = object.__new__(multiprocessing.managers.SyncManager)
1932 locals().update(get_attributes(manager, (
1933 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1934 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1935 'Namespace', 'JoinableQueue'
1936 )))
1937
1938testcases_manager = create_test_cases(ManagerMixin, type='manager')
1939globals().update(testcases_manager)
1940
1941
1942class ThreadsMixin(object):
1943 TYPE = 'threads'
1944 Process = multiprocessing.dummy.Process
1945 locals().update(get_attributes(multiprocessing.dummy, (
1946 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1947 'Condition', 'Event', 'Value', 'Array', 'current_process',
1948 'active_children', 'Pipe', 'connection', 'dict', 'list',
1949 'Namespace', 'JoinableQueue'
1950 )))
1951
1952testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1953globals().update(testcases_threads)
1954
Neal Norwitz0c519b32008-08-25 01:50:24 +00001955class OtherTest(unittest.TestCase):
1956 # TODO: add more tests for deliver/answer challenge.
1957 def test_deliver_challenge_auth_failure(self):
1958 class _FakeConnection(object):
1959 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001960 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001961 def send_bytes(self, data):
1962 pass
1963 self.assertRaises(multiprocessing.AuthenticationError,
1964 multiprocessing.connection.deliver_challenge,
1965 _FakeConnection(), b'abc')
1966
1967 def test_answer_challenge_auth_failure(self):
1968 class _FakeConnection(object):
1969 def __init__(self):
1970 self.count = 0
1971 def recv_bytes(self, size):
1972 self.count += 1
1973 if self.count == 1:
1974 return multiprocessing.connection.CHALLENGE
1975 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001976 return b'something bogus'
1977 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001978 def send_bytes(self, data):
1979 pass
1980 self.assertRaises(multiprocessing.AuthenticationError,
1981 multiprocessing.connection.answer_challenge,
1982 _FakeConnection(), b'abc')
1983
Jesse Noller7152f6d2009-04-02 05:17:26 +00001984#
1985# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1986#
1987
1988def initializer(ns):
1989 ns.test += 1
1990
1991class TestInitializers(unittest.TestCase):
1992 def setUp(self):
1993 self.mgr = multiprocessing.Manager()
1994 self.ns = self.mgr.Namespace()
1995 self.ns.test = 0
1996
1997 def tearDown(self):
1998 self.mgr.shutdown()
1999
2000 def test_manager_initializer(self):
2001 m = multiprocessing.managers.SyncManager()
2002 self.assertRaises(TypeError, m.start, 1)
2003 m.start(initializer, (self.ns,))
2004 self.assertEqual(self.ns.test, 1)
2005 m.shutdown()
2006
2007 def test_pool_initializer(self):
2008 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2009 p = multiprocessing.Pool(1, initializer, (self.ns,))
2010 p.close()
2011 p.join()
2012 self.assertEqual(self.ns.test, 1)
2013
Jesse Noller1b90efb2009-06-30 17:11:52 +00002014#
2015# Issue 5155, 5313, 5331: Test process in processes
2016# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2017#
2018
2019def _ThisSubProcess(q):
2020 try:
2021 item = q.get(block=False)
2022 except Queue.Empty:
2023 pass
2024
2025def _TestProcess(q):
2026 queue = multiprocessing.Queue()
2027 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2028 subProc.start()
2029 subProc.join()
2030
2031def _afunc(x):
2032 return x*x
2033
2034def pool_in_process():
2035 pool = multiprocessing.Pool(processes=4)
2036 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2037
2038class _file_like(object):
2039 def __init__(self, delegate):
2040 self._delegate = delegate
2041 self._pid = None
2042
2043 @property
2044 def cache(self):
2045 pid = os.getpid()
2046 # There are no race conditions since fork keeps only the running thread
2047 if pid != self._pid:
2048 self._pid = pid
2049 self._cache = []
2050 return self._cache
2051
2052 def write(self, data):
2053 self.cache.append(data)
2054
2055 def flush(self):
2056 self._delegate.write(''.join(self.cache))
2057 self._cache = []
2058
2059class TestStdinBadfiledescriptor(unittest.TestCase):
2060
2061 def test_queue_in_process(self):
2062 queue = multiprocessing.Queue()
2063 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2064 proc.start()
2065 proc.join()
2066
2067 def test_pool_in_process(self):
2068 p = multiprocessing.Process(target=pool_in_process)
2069 p.start()
2070 p.join()
2071
2072 def test_flushing(self):
2073 sio = StringIO()
2074 flike = _file_like(sio)
2075 flike.write('foo')
2076 proc = multiprocessing.Process(target=lambda: flike.flush())
2077 flike.flush()
2078 assert sio.getvalue() == 'foo'
2079
2080testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2081 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002082
Benjamin Petersondfd79492008-06-13 19:13:39 +00002083#
2084#
2085#
2086
2087def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002088 if sys.platform.startswith("linux"):
2089 try:
2090 lock = multiprocessing.RLock()
2091 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002092 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002093
Benjamin Petersondfd79492008-06-13 19:13:39 +00002094 if run is None:
2095 from test.test_support import run_unittest as run
2096
2097 util.get_temp_dir() # creates temp directory for use by all processes
2098
2099 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2100
Jesse Noller146b7ab2008-07-02 16:44:09 +00002101 ProcessesMixin.pool = multiprocessing.Pool(4)
2102 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2103 ManagerMixin.manager.__init__()
2104 ManagerMixin.manager.start()
2105 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002106
2107 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002108 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2109 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002110 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2111 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002112 )
2113
2114 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2115 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002116 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2117 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002118 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002119 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002120 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002121 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2122 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2123 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002124 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002125
Jesse Noller146b7ab2008-07-02 16:44:09 +00002126 ThreadsMixin.pool.terminate()
2127 ProcessesMixin.pool.terminate()
2128 ManagerMixin.pool.terminate()
2129 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002130
Jesse Noller146b7ab2008-07-02 16:44:09 +00002131 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002132
2133def main():
2134 test_main(unittest.TextTestRunner(verbosity=2).run)
2135
2136if __name__ == '__main__':
2137 main()