blob: fe19de037152cad683fddc3f9a93479658c31925 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
Brian Curtina06e9b82010-10-07 02:27:41 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44#
45#
46
Benjamin Petersone79edf52008-07-13 18:34:58 +000047latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000054#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000055
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersondfd79492008-06-13 19:13:39 +000071#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000072# Some tests require ctypes
73#
74
75try:
Nick Coghlan13623662010-04-10 14:24:36 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersondfd79492008-06-13 19:13:39 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersondfd79492008-06-13 19:13:39 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155
156 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000158 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 q.put(args)
167 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000169 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000182 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000186 self.assertEqual(p.authkey, current.authkey)
187 self.assertEqual(p.is_alive(), False)
188 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000189 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192
193 p.start()
194
Ezio Melotti2623a372010-11-21 13:34:58 +0000195 self.assertEqual(p.exitcode, None)
196 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000197 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198
Ezio Melotti2623a372010-11-21 13:34:58 +0000199 self.assertEqual(q.get(), args[1:])
200 self.assertEqual(q.get(), kwargs)
201 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000203 self.assertEqual(q.get(), current.authkey)
204 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205
206 p.join()
207
Ezio Melotti2623a372010-11-21 13:34:58 +0000208 self.assertEqual(p.exitcode, 0)
209 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000211
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000225 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000235 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 p.join()
238
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255
256 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000260 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000369 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(Queue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(Queue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(Queue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000420 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000436 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(Queue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(Queue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(Queue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(Queue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in xrange(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in xrange(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Noller82eb5902009-03-30 23:29:31 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000666 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000670 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000712 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000717 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in xrange(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in xrange(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000735 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000739 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in xrange(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
Ezio Melottic2077b02011-03-16 12:34:31 +0200783 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000784 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000809 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000810
811#
812#
813#
814
815class _TestValue(BaseTestCase):
816
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000817 ALLOWED_TYPES = ('processes',)
818
Benjamin Petersondfd79492008-06-13 19:13:39 +0000819 codes_values = [
820 ('i', 4343, 24234),
821 ('d', 3.625, -4.25),
822 ('h', -232, 234),
823 ('c', latin('x'), latin('y'))
824 ]
825
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000826 def setUp(self):
827 if not HAS_SHAREDCTYPES:
828 self.skipTest("requires multiprocessing.sharedctypes")
829
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000830 @classmethod
831 def _test(cls, values):
832 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 sv.value = cv[2]
834
835
836 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000837 if raw:
838 values = [self.RawValue(code, value)
839 for code, value, _ in self.codes_values]
840 else:
841 values = [self.Value(code, value)
842 for code, value, _ in self.codes_values]
843
844 for sv, cv in zip(values, self.codes_values):
845 self.assertEqual(sv.value, cv[1])
846
847 proc = self.Process(target=self._test, args=(values,))
848 proc.start()
849 proc.join()
850
851 for sv, cv in zip(values, self.codes_values):
852 self.assertEqual(sv.value, cv[2])
853
854 def test_rawvalue(self):
855 self.test_value(raw=True)
856
857 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 val1 = self.Value('i', 5)
859 lock1 = val1.get_lock()
860 obj1 = val1.get_obj()
861
862 val2 = self.Value('i', 5, lock=None)
863 lock2 = val2.get_lock()
864 obj2 = val2.get_obj()
865
866 lock = self.Lock()
867 val3 = self.Value('i', 5, lock=lock)
868 lock3 = val3.get_lock()
869 obj3 = val3.get_obj()
870 self.assertEqual(lock, lock3)
871
Jesse Noller6ab22152009-01-18 02:45:38 +0000872 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873 self.assertFalse(hasattr(arr4, 'get_lock'))
874 self.assertFalse(hasattr(arr4, 'get_obj'))
875
Jesse Noller6ab22152009-01-18 02:45:38 +0000876 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
877
878 arr5 = self.RawValue('i', 5)
879 self.assertFalse(hasattr(arr5, 'get_lock'))
880 self.assertFalse(hasattr(arr5, 'get_obj'))
881
Benjamin Petersondfd79492008-06-13 19:13:39 +0000882
883class _TestArray(BaseTestCase):
884
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000885 ALLOWED_TYPES = ('processes',)
886
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000887 @classmethod
888 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 for i in range(1, len(seq)):
890 seq[i] += seq[i-1]
891
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000893 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000894 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
895 if raw:
896 arr = self.RawArray('i', seq)
897 else:
898 arr = self.Array('i', seq)
899
900 self.assertEqual(len(arr), len(seq))
901 self.assertEqual(arr[3], seq[3])
902 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
903
904 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
905
906 self.assertEqual(list(arr[:]), seq)
907
908 self.f(seq)
909
910 p = self.Process(target=self.f, args=(arr,))
911 p.start()
912 p.join()
913
914 self.assertEqual(list(arr[:]), seq)
915
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000917 def test_array_from_size(self):
918 size = 10
919 # Test for zeroing (see issue #11675).
920 # The repetition below strengthens the test by increasing the chances
921 # of previously allocated non-zero memory being used for the new array
922 # on the 2nd and 3rd loops.
923 for _ in range(3):
924 arr = self.Array('i', size)
925 self.assertEqual(len(arr), size)
926 self.assertEqual(list(arr), [0] * size)
927 arr[:] = range(10)
928 self.assertEqual(list(arr), range(10))
929 del arr
930
931 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000932 def test_rawarray(self):
933 self.test_array(raw=True)
934
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000935 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000936 def test_array_accepts_long(self):
937 arr = self.Array('i', 10L)
938 self.assertEqual(len(arr), 10)
939 raw_arr = self.RawArray('i', 10L)
940 self.assertEqual(len(raw_arr), 10)
941
942 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000943 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 arr1 = self.Array('i', range(10))
945 lock1 = arr1.get_lock()
946 obj1 = arr1.get_obj()
947
948 arr2 = self.Array('i', range(10), lock=None)
949 lock2 = arr2.get_lock()
950 obj2 = arr2.get_obj()
951
952 lock = self.Lock()
953 arr3 = self.Array('i', range(10), lock=lock)
954 lock3 = arr3.get_lock()
955 obj3 = arr3.get_obj()
956 self.assertEqual(lock, lock3)
957
Jesse Noller6ab22152009-01-18 02:45:38 +0000958 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000959 self.assertFalse(hasattr(arr4, 'get_lock'))
960 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000961 self.assertRaises(AttributeError,
962 self.Array, 'i', range(10), lock='notalock')
963
964 arr5 = self.RawArray('i', range(10))
965 self.assertFalse(hasattr(arr5, 'get_lock'))
966 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000967
968#
969#
970#
971
972class _TestContainers(BaseTestCase):
973
974 ALLOWED_TYPES = ('manager',)
975
976 def test_list(self):
977 a = self.list(range(10))
978 self.assertEqual(a[:], range(10))
979
980 b = self.list()
981 self.assertEqual(b[:], [])
982
983 b.extend(range(5))
984 self.assertEqual(b[:], range(5))
985
986 self.assertEqual(b[2], 2)
987 self.assertEqual(b[2:10], [2,3,4])
988
989 b *= 2
990 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
991
992 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
993
994 self.assertEqual(a[:], range(10))
995
996 d = [a, b]
997 e = self.list(d)
998 self.assertEqual(
999 e[:],
1000 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1001 )
1002
1003 f = self.list([a])
1004 a.append('hello')
1005 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1006
1007 def test_dict(self):
1008 d = self.dict()
1009 indices = range(65, 70)
1010 for i in indices:
1011 d[i] = chr(i)
1012 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1013 self.assertEqual(sorted(d.keys()), indices)
1014 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1015 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1016
1017 def test_namespace(self):
1018 n = self.Namespace()
1019 n.name = 'Bob'
1020 n.job = 'Builder'
1021 n._hidden = 'hidden'
1022 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1023 del n.job
1024 self.assertEqual(str(n), "Namespace(name='Bob')")
1025 self.assertTrue(hasattr(n, 'name'))
1026 self.assertTrue(not hasattr(n, 'job'))
1027
1028#
1029#
1030#
1031
1032def sqr(x, wait=0.0):
1033 time.sleep(wait)
1034 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001035class _TestPool(BaseTestCase):
1036
1037 def test_apply(self):
1038 papply = self.pool.apply
1039 self.assertEqual(papply(sqr, (5,)), sqr(5))
1040 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1041
1042 def test_map(self):
1043 pmap = self.pool.map
1044 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1045 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1046 map(sqr, range(100)))
1047
Jesse Noller7530e472009-07-16 14:23:04 +00001048 def test_map_chunksize(self):
1049 try:
1050 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1051 except multiprocessing.TimeoutError:
1052 self.fail("pool.map_async with chunksize stalled on null list")
1053
Benjamin Petersondfd79492008-06-13 19:13:39 +00001054 def test_async(self):
1055 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1056 get = TimingWrapper(res.get)
1057 self.assertEqual(get(), 49)
1058 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1059
1060 def test_async_timeout(self):
1061 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1062 get = TimingWrapper(res.get)
1063 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1064 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1065
1066 def test_imap(self):
1067 it = self.pool.imap(sqr, range(10))
1068 self.assertEqual(list(it), map(sqr, range(10)))
1069
1070 it = self.pool.imap(sqr, range(10))
1071 for i in range(10):
1072 self.assertEqual(it.next(), i*i)
1073 self.assertRaises(StopIteration, it.next)
1074
1075 it = self.pool.imap(sqr, range(1000), chunksize=100)
1076 for i in range(1000):
1077 self.assertEqual(it.next(), i*i)
1078 self.assertRaises(StopIteration, it.next)
1079
1080 def test_imap_unordered(self):
1081 it = self.pool.imap_unordered(sqr, range(1000))
1082 self.assertEqual(sorted(it), map(sqr, range(1000)))
1083
1084 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1085 self.assertEqual(sorted(it), map(sqr, range(1000)))
1086
1087 def test_make_pool(self):
1088 p = multiprocessing.Pool(3)
1089 self.assertEqual(3, len(p._pool))
1090 p.close()
1091 p.join()
1092
1093 def test_terminate(self):
1094 if self.TYPE == 'manager':
1095 # On Unix a forked process increfs each shared object to
1096 # which its parent process held a reference. If the
1097 # forked process gets terminated then there is likely to
1098 # be a reference leak. So to prevent
1099 # _TestZZZNumberOfObjects from failing we skip this test
1100 # when using a manager.
1101 return
1102
1103 result = self.pool.map_async(
1104 time.sleep, [0.1 for i in range(10000)], chunksize=1
1105 )
1106 self.pool.terminate()
1107 join = TimingWrapper(self.pool.join)
1108 join()
1109 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001110
1111class _TestPoolWorkerLifetime(BaseTestCase):
1112
1113 ALLOWED_TYPES = ('processes', )
1114 def test_pool_worker_lifetime(self):
1115 p = multiprocessing.Pool(3, maxtasksperchild=10)
1116 self.assertEqual(3, len(p._pool))
1117 origworkerpids = [w.pid for w in p._pool]
1118 # Run many tasks so each worker gets replaced (hopefully)
1119 results = []
1120 for i in range(100):
1121 results.append(p.apply_async(sqr, (i, )))
1122 # Fetch the results and verify we got the right answers,
1123 # also ensuring all the tasks have completed.
1124 for (j, res) in enumerate(results):
1125 self.assertEqual(res.get(), sqr(j))
1126 # Refill the pool
1127 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001128 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001129 # (countdown * DELTA = 5 seconds max startup process time)
1130 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001131 while countdown and not all(w.is_alive() for w in p._pool):
1132 countdown -= 1
1133 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001134 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001135 # All pids should be assigned. See issue #7805.
1136 self.assertNotIn(None, origworkerpids)
1137 self.assertNotIn(None, finalworkerpids)
1138 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001139 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1140 p.close()
1141 p.join()
1142
Benjamin Petersondfd79492008-06-13 19:13:39 +00001143#
1144# Test that manager has expected number of shared objects left
1145#
1146
1147class _TestZZZNumberOfObjects(BaseTestCase):
1148 # Because test cases are sorted alphabetically, this one will get
1149 # run after all the other tests for the manager. It tests that
1150 # there have been no "reference leaks" for the manager's shared
1151 # objects. Note the comment in _TestPool.test_terminate().
1152 ALLOWED_TYPES = ('manager',)
1153
1154 def test_number_of_objects(self):
1155 EXPECTED_NUMBER = 1 # the pool object is still alive
1156 multiprocessing.active_children() # discard dead process objs
1157 gc.collect() # do garbage collection
1158 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001159 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001160 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001161 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001162 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001163
1164 self.assertEqual(refs, EXPECTED_NUMBER)
1165
1166#
1167# Test of creating a customized manager class
1168#
1169
1170from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1171
1172class FooBar(object):
1173 def f(self):
1174 return 'f()'
1175 def g(self):
1176 raise ValueError
1177 def _h(self):
1178 return '_h()'
1179
1180def baz():
1181 for i in xrange(10):
1182 yield i*i
1183
1184class IteratorProxy(BaseProxy):
1185 _exposed_ = ('next', '__next__')
1186 def __iter__(self):
1187 return self
1188 def next(self):
1189 return self._callmethod('next')
1190 def __next__(self):
1191 return self._callmethod('__next__')
1192
1193class MyManager(BaseManager):
1194 pass
1195
1196MyManager.register('Foo', callable=FooBar)
1197MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1198MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1199
1200
1201class _TestMyManager(BaseTestCase):
1202
1203 ALLOWED_TYPES = ('manager',)
1204
1205 def test_mymanager(self):
1206 manager = MyManager()
1207 manager.start()
1208
1209 foo = manager.Foo()
1210 bar = manager.Bar()
1211 baz = manager.baz()
1212
1213 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1214 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1215
1216 self.assertEqual(foo_methods, ['f', 'g'])
1217 self.assertEqual(bar_methods, ['f', '_h'])
1218
1219 self.assertEqual(foo.f(), 'f()')
1220 self.assertRaises(ValueError, foo.g)
1221 self.assertEqual(foo._callmethod('f'), 'f()')
1222 self.assertRaises(RemoteError, foo._callmethod, '_h')
1223
1224 self.assertEqual(bar.f(), 'f()')
1225 self.assertEqual(bar._h(), '_h()')
1226 self.assertEqual(bar._callmethod('f'), 'f()')
1227 self.assertEqual(bar._callmethod('_h'), '_h()')
1228
1229 self.assertEqual(list(baz), [i*i for i in range(10)])
1230
1231 manager.shutdown()
1232
1233#
1234# Test of connecting to a remote server and using xmlrpclib for serialization
1235#
1236
1237_queue = Queue.Queue()
1238def get_queue():
1239 return _queue
1240
1241class QueueManager(BaseManager):
1242 '''manager class used by server process'''
1243QueueManager.register('get_queue', callable=get_queue)
1244
1245class QueueManager2(BaseManager):
1246 '''manager class which specifies the same interface as QueueManager'''
1247QueueManager2.register('get_queue')
1248
1249
1250SERIALIZER = 'xmlrpclib'
1251
1252class _TestRemoteManager(BaseTestCase):
1253
1254 ALLOWED_TYPES = ('manager',)
1255
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001256 @classmethod
1257 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001258 manager = QueueManager2(
1259 address=address, authkey=authkey, serializer=SERIALIZER
1260 )
1261 manager.connect()
1262 queue = manager.get_queue()
1263 queue.put(('hello world', None, True, 2.25))
1264
1265 def test_remote(self):
1266 authkey = os.urandom(32)
1267
1268 manager = QueueManager(
1269 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1270 )
1271 manager.start()
1272
1273 p = self.Process(target=self._putter, args=(manager.address, authkey))
1274 p.start()
1275
1276 manager2 = QueueManager2(
1277 address=manager.address, authkey=authkey, serializer=SERIALIZER
1278 )
1279 manager2.connect()
1280 queue = manager2.get_queue()
1281
1282 # Note that xmlrpclib will deserialize object as a list not a tuple
1283 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1284
1285 # Because we are using xmlrpclib for serialization instead of
1286 # pickle this will cause a serialization error.
1287 self.assertRaises(Exception, queue.put, time.sleep)
1288
1289 # Make queue finalizer run before the server is stopped
1290 del queue
1291 manager.shutdown()
1292
Jesse Noller459a6482009-03-30 15:50:42 +00001293class _TestManagerRestart(BaseTestCase):
1294
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001295 @classmethod
1296 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001297 manager = QueueManager(
1298 address=address, authkey=authkey, serializer=SERIALIZER)
1299 manager.connect()
1300 queue = manager.get_queue()
1301 queue.put('hello world')
1302
1303 def test_rapid_restart(self):
1304 authkey = os.urandom(32)
1305 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001306 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001307 srvr = manager.get_server()
1308 addr = srvr.address
1309 # Close the connection.Listener socket which gets opened as a part
1310 # of manager.get_server(). It's not needed for the test.
1311 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001312 manager.start()
1313
1314 p = self.Process(target=self._putter, args=(manager.address, authkey))
1315 p.start()
1316 queue = manager.get_queue()
1317 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001318 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001319 manager.shutdown()
1320 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001321 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001322 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001323 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001324
Benjamin Petersondfd79492008-06-13 19:13:39 +00001325#
1326#
1327#
1328
1329SENTINEL = latin('')
1330
1331class _TestConnection(BaseTestCase):
1332
1333 ALLOWED_TYPES = ('processes', 'threads')
1334
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001335 @classmethod
1336 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001337 for msg in iter(conn.recv_bytes, SENTINEL):
1338 conn.send_bytes(msg)
1339 conn.close()
1340
1341 def test_connection(self):
1342 conn, child_conn = self.Pipe()
1343
1344 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001345 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001346 p.start()
1347
1348 seq = [1, 2.25, None]
1349 msg = latin('hello world')
1350 longmsg = msg * 10
1351 arr = array.array('i', range(4))
1352
1353 if self.TYPE == 'processes':
1354 self.assertEqual(type(conn.fileno()), int)
1355
1356 self.assertEqual(conn.send(seq), None)
1357 self.assertEqual(conn.recv(), seq)
1358
1359 self.assertEqual(conn.send_bytes(msg), None)
1360 self.assertEqual(conn.recv_bytes(), msg)
1361
1362 if self.TYPE == 'processes':
1363 buffer = array.array('i', [0]*10)
1364 expected = list(arr) + [0] * (10 - len(arr))
1365 self.assertEqual(conn.send_bytes(arr), None)
1366 self.assertEqual(conn.recv_bytes_into(buffer),
1367 len(arr) * buffer.itemsize)
1368 self.assertEqual(list(buffer), expected)
1369
1370 buffer = array.array('i', [0]*10)
1371 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1372 self.assertEqual(conn.send_bytes(arr), None)
1373 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1374 len(arr) * buffer.itemsize)
1375 self.assertEqual(list(buffer), expected)
1376
1377 buffer = bytearray(latin(' ' * 40))
1378 self.assertEqual(conn.send_bytes(longmsg), None)
1379 try:
1380 res = conn.recv_bytes_into(buffer)
1381 except multiprocessing.BufferTooShort, e:
1382 self.assertEqual(e.args, (longmsg,))
1383 else:
1384 self.fail('expected BufferTooShort, got %s' % res)
1385
1386 poll = TimingWrapper(conn.poll)
1387
1388 self.assertEqual(poll(), False)
1389 self.assertTimingAlmostEqual(poll.elapsed, 0)
1390
1391 self.assertEqual(poll(TIMEOUT1), False)
1392 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1393
1394 conn.send(None)
1395
1396 self.assertEqual(poll(TIMEOUT1), True)
1397 self.assertTimingAlmostEqual(poll.elapsed, 0)
1398
1399 self.assertEqual(conn.recv(), None)
1400
1401 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1402 conn.send_bytes(really_big_msg)
1403 self.assertEqual(conn.recv_bytes(), really_big_msg)
1404
1405 conn.send_bytes(SENTINEL) # tell child to quit
1406 child_conn.close()
1407
1408 if self.TYPE == 'processes':
1409 self.assertEqual(conn.readable, True)
1410 self.assertEqual(conn.writable, True)
1411 self.assertRaises(EOFError, conn.recv)
1412 self.assertRaises(EOFError, conn.recv_bytes)
1413
1414 p.join()
1415
1416 def test_duplex_false(self):
1417 reader, writer = self.Pipe(duplex=False)
1418 self.assertEqual(writer.send(1), None)
1419 self.assertEqual(reader.recv(), 1)
1420 if self.TYPE == 'processes':
1421 self.assertEqual(reader.readable, True)
1422 self.assertEqual(reader.writable, False)
1423 self.assertEqual(writer.readable, False)
1424 self.assertEqual(writer.writable, True)
1425 self.assertRaises(IOError, reader.send, 2)
1426 self.assertRaises(IOError, writer.recv)
1427 self.assertRaises(IOError, writer.poll)
1428
1429 def test_spawn_close(self):
1430 # We test that a pipe connection can be closed by parent
1431 # process immediately after child is spawned. On Windows this
1432 # would have sometimes failed on old versions because
1433 # child_conn would be closed before the child got a chance to
1434 # duplicate it.
1435 conn, child_conn = self.Pipe()
1436
1437 p = self.Process(target=self._echo, args=(child_conn,))
1438 p.start()
1439 child_conn.close() # this might complete before child initializes
1440
1441 msg = latin('hello')
1442 conn.send_bytes(msg)
1443 self.assertEqual(conn.recv_bytes(), msg)
1444
1445 conn.send_bytes(SENTINEL)
1446 conn.close()
1447 p.join()
1448
1449 def test_sendbytes(self):
1450 if self.TYPE != 'processes':
1451 return
1452
1453 msg = latin('abcdefghijklmnopqrstuvwxyz')
1454 a, b = self.Pipe()
1455
1456 a.send_bytes(msg)
1457 self.assertEqual(b.recv_bytes(), msg)
1458
1459 a.send_bytes(msg, 5)
1460 self.assertEqual(b.recv_bytes(), msg[5:])
1461
1462 a.send_bytes(msg, 7, 8)
1463 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1464
1465 a.send_bytes(msg, 26)
1466 self.assertEqual(b.recv_bytes(), latin(''))
1467
1468 a.send_bytes(msg, 26, 0)
1469 self.assertEqual(b.recv_bytes(), latin(''))
1470
1471 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1472
1473 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1474
1475 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1476
1477 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1478
1479 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1480
Benjamin Petersondfd79492008-06-13 19:13:39 +00001481class _TestListenerClient(BaseTestCase):
1482
1483 ALLOWED_TYPES = ('processes', 'threads')
1484
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001485 @classmethod
1486 def _test(cls, address):
1487 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001488 conn.send('hello')
1489 conn.close()
1490
1491 def test_listener_client(self):
1492 for family in self.connection.families:
1493 l = self.connection.Listener(family=family)
1494 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001495 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001496 p.start()
1497 conn = l.accept()
1498 self.assertEqual(conn.recv(), 'hello')
1499 p.join()
1500 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001501#
1502# Test of sending connection and socket objects between processes
1503#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001504"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001505class _TestPicklingConnections(BaseTestCase):
1506
1507 ALLOWED_TYPES = ('processes',)
1508
1509 def _listener(self, conn, families):
1510 for fam in families:
1511 l = self.connection.Listener(family=fam)
1512 conn.send(l.address)
1513 new_conn = l.accept()
1514 conn.send(new_conn)
1515
1516 if self.TYPE == 'processes':
1517 l = socket.socket()
1518 l.bind(('localhost', 0))
1519 conn.send(l.getsockname())
1520 l.listen(1)
1521 new_conn, addr = l.accept()
1522 conn.send(new_conn)
1523
1524 conn.recv()
1525
1526 def _remote(self, conn):
1527 for (address, msg) in iter(conn.recv, None):
1528 client = self.connection.Client(address)
1529 client.send(msg.upper())
1530 client.close()
1531
1532 if self.TYPE == 'processes':
1533 address, msg = conn.recv()
1534 client = socket.socket()
1535 client.connect(address)
1536 client.sendall(msg.upper())
1537 client.close()
1538
1539 conn.close()
1540
1541 def test_pickling(self):
1542 try:
1543 multiprocessing.allow_connection_pickling()
1544 except ImportError:
1545 return
1546
1547 families = self.connection.families
1548
1549 lconn, lconn0 = self.Pipe()
1550 lp = self.Process(target=self._listener, args=(lconn0, families))
1551 lp.start()
1552 lconn0.close()
1553
1554 rconn, rconn0 = self.Pipe()
1555 rp = self.Process(target=self._remote, args=(rconn0,))
1556 rp.start()
1557 rconn0.close()
1558
1559 for fam in families:
1560 msg = ('This connection uses family %s' % fam).encode('ascii')
1561 address = lconn.recv()
1562 rconn.send((address, msg))
1563 new_conn = lconn.recv()
1564 self.assertEqual(new_conn.recv(), msg.upper())
1565
1566 rconn.send(None)
1567
1568 if self.TYPE == 'processes':
1569 msg = latin('This connection uses a normal socket')
1570 address = lconn.recv()
1571 rconn.send((address, msg))
1572 if hasattr(socket, 'fromfd'):
1573 new_conn = lconn.recv()
1574 self.assertEqual(new_conn.recv(100), msg.upper())
1575 else:
1576 # XXX On Windows with Py2.6 need to backport fromfd()
1577 discard = lconn.recv_bytes()
1578
1579 lconn.send(None)
1580
1581 rconn.close()
1582 lconn.close()
1583
1584 lp.join()
1585 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001586"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001587#
1588#
1589#
1590
1591class _TestHeap(BaseTestCase):
1592
1593 ALLOWED_TYPES = ('processes',)
1594
1595 def test_heap(self):
1596 iterations = 5000
1597 maxblocks = 50
1598 blocks = []
1599
1600 # create and destroy lots of blocks of different sizes
1601 for i in xrange(iterations):
1602 size = int(random.lognormvariate(0, 1) * 1000)
1603 b = multiprocessing.heap.BufferWrapper(size)
1604 blocks.append(b)
1605 if len(blocks) > maxblocks:
1606 i = random.randrange(maxblocks)
1607 del blocks[i]
1608
1609 # get the heap object
1610 heap = multiprocessing.heap.BufferWrapper._heap
1611
1612 # verify the state of the heap
1613 all = []
1614 occupied = 0
1615 for L in heap._len_to_seq.values():
1616 for arena, start, stop in L:
1617 all.append((heap._arenas.index(arena), start, stop,
1618 stop-start, 'free'))
1619 for arena, start, stop in heap._allocated_blocks:
1620 all.append((heap._arenas.index(arena), start, stop,
1621 stop-start, 'occupied'))
1622 occupied += (stop-start)
1623
1624 all.sort()
1625
1626 for i in range(len(all)-1):
1627 (arena, start, stop) = all[i][:3]
1628 (narena, nstart, nstop) = all[i+1][:3]
1629 self.assertTrue((arena != narena and nstart == 0) or
1630 (stop == nstart))
1631
1632#
1633#
1634#
1635
Benjamin Petersondfd79492008-06-13 19:13:39 +00001636class _Foo(Structure):
1637 _fields_ = [
1638 ('x', c_int),
1639 ('y', c_double)
1640 ]
1641
1642class _TestSharedCTypes(BaseTestCase):
1643
1644 ALLOWED_TYPES = ('processes',)
1645
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001646 def setUp(self):
1647 if not HAS_SHAREDCTYPES:
1648 self.skipTest("requires multiprocessing.sharedctypes")
1649
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001650 @classmethod
1651 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001652 x.value *= 2
1653 y.value *= 2
1654 foo.x *= 2
1655 foo.y *= 2
1656 string.value *= 2
1657 for i in range(len(arr)):
1658 arr[i] *= 2
1659
1660 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001661 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001662 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001663 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001664 arr = self.Array('d', range(10), lock=lock)
1665 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001666 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001667
1668 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1669 p.start()
1670 p.join()
1671
1672 self.assertEqual(x.value, 14)
1673 self.assertAlmostEqual(y.value, 2.0/3.0)
1674 self.assertEqual(foo.x, 6)
1675 self.assertAlmostEqual(foo.y, 4.0)
1676 for i in range(10):
1677 self.assertAlmostEqual(arr[i], i*2)
1678 self.assertEqual(string.value, latin('hellohello'))
1679
1680 def test_synchronize(self):
1681 self.test_sharedctypes(lock=True)
1682
1683 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001684 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001685 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001686 foo.x = 0
1687 foo.y = 0
1688 self.assertEqual(bar.x, 2)
1689 self.assertAlmostEqual(bar.y, 5.0)
1690
1691#
1692#
1693#
1694
1695class _TestFinalize(BaseTestCase):
1696
1697 ALLOWED_TYPES = ('processes',)
1698
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001699 @classmethod
1700 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001701 class Foo(object):
1702 pass
1703
1704 a = Foo()
1705 util.Finalize(a, conn.send, args=('a',))
1706 del a # triggers callback for a
1707
1708 b = Foo()
1709 close_b = util.Finalize(b, conn.send, args=('b',))
1710 close_b() # triggers callback for b
1711 close_b() # does nothing because callback has already been called
1712 del b # does nothing because callback has already been called
1713
1714 c = Foo()
1715 util.Finalize(c, conn.send, args=('c',))
1716
1717 d10 = Foo()
1718 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1719
1720 d01 = Foo()
1721 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1722 d02 = Foo()
1723 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1724 d03 = Foo()
1725 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1726
1727 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1728
1729 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1730
Ezio Melottic2077b02011-03-16 12:34:31 +02001731 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001732 # garbage collecting locals
1733 util._exit_function()
1734 conn.close()
1735 os._exit(0)
1736
1737 def test_finalize(self):
1738 conn, child_conn = self.Pipe()
1739
1740 p = self.Process(target=self._test_finalize, args=(child_conn,))
1741 p.start()
1742 p.join()
1743
1744 result = [obj for obj in iter(conn.recv, 'STOP')]
1745 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1746
1747#
1748# Test that from ... import * works for each module
1749#
1750
1751class _TestImportStar(BaseTestCase):
1752
1753 ALLOWED_TYPES = ('processes',)
1754
1755 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001756 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001757 'multiprocessing', 'multiprocessing.connection',
1758 'multiprocessing.heap', 'multiprocessing.managers',
1759 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001760 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001761 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001762 ]
1763
1764 if c_int is not None:
1765 # This module requires _ctypes
1766 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001767
1768 for name in modules:
1769 __import__(name)
1770 mod = sys.modules[name]
1771
1772 for attr in getattr(mod, '__all__', ()):
1773 self.assertTrue(
1774 hasattr(mod, attr),
1775 '%r does not have attribute %r' % (mod, attr)
1776 )
1777
1778#
1779# Quick test that logging works -- does not test logging output
1780#
1781
1782class _TestLogging(BaseTestCase):
1783
1784 ALLOWED_TYPES = ('processes',)
1785
1786 def test_enable_logging(self):
1787 logger = multiprocessing.get_logger()
1788 logger.setLevel(util.SUBWARNING)
1789 self.assertTrue(logger is not None)
1790 logger.debug('this will not be printed')
1791 logger.info('nor will this')
1792 logger.setLevel(LOG_LEVEL)
1793
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001794 @classmethod
1795 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001796 logger = multiprocessing.get_logger()
1797 conn.send(logger.getEffectiveLevel())
1798
1799 def test_level(self):
1800 LEVEL1 = 32
1801 LEVEL2 = 37
1802
1803 logger = multiprocessing.get_logger()
1804 root_logger = logging.getLogger()
1805 root_level = root_logger.level
1806
1807 reader, writer = multiprocessing.Pipe(duplex=False)
1808
1809 logger.setLevel(LEVEL1)
1810 self.Process(target=self._test_level, args=(writer,)).start()
1811 self.assertEqual(LEVEL1, reader.recv())
1812
1813 logger.setLevel(logging.NOTSET)
1814 root_logger.setLevel(LEVEL2)
1815 self.Process(target=self._test_level, args=(writer,)).start()
1816 self.assertEqual(LEVEL2, reader.recv())
1817
1818 root_logger.setLevel(root_level)
1819 logger.setLevel(level=LOG_LEVEL)
1820
Jesse Noller814d02d2009-11-21 14:38:23 +00001821
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001822# class _TestLoggingProcessName(BaseTestCase):
1823#
1824# def handle(self, record):
1825# assert record.processName == multiprocessing.current_process().name
1826# self.__handled = True
1827#
1828# def test_logging(self):
1829# handler = logging.Handler()
1830# handler.handle = self.handle
1831# self.__handled = False
1832# # Bypass getLogger() and side-effects
1833# logger = logging.getLoggerClass()(
1834# 'multiprocessing.test.TestLoggingProcessName')
1835# logger.addHandler(handler)
1836# logger.propagate = False
1837#
1838# logger.warn('foo')
1839# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001840
Benjamin Petersondfd79492008-06-13 19:13:39 +00001841#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001842# Test to verify handle verification, see issue 3321
1843#
1844
1845class TestInvalidHandle(unittest.TestCase):
1846
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001847 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001848 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001849 conn = _multiprocessing.Connection(44977608)
1850 self.assertRaises(IOError, conn.poll)
1851 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001852
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001853#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001854# Functions used to create test cases from the base ones in this module
1855#
1856
1857def get_attributes(Source, names):
1858 d = {}
1859 for name in names:
1860 obj = getattr(Source, name)
1861 if type(obj) == type(get_attributes):
1862 obj = staticmethod(obj)
1863 d[name] = obj
1864 return d
1865
1866def create_test_cases(Mixin, type):
1867 result = {}
1868 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001869 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001870
1871 for name in glob.keys():
1872 if name.startswith('_Test'):
1873 base = glob[name]
1874 if type in base.ALLOWED_TYPES:
1875 newname = 'With' + Type + name[1:]
1876 class Temp(base, unittest.TestCase, Mixin):
1877 pass
1878 result[newname] = Temp
1879 Temp.__name__ = newname
1880 Temp.__module__ = Mixin.__module__
1881 return result
1882
1883#
1884# Create test cases
1885#
1886
1887class ProcessesMixin(object):
1888 TYPE = 'processes'
1889 Process = multiprocessing.Process
1890 locals().update(get_attributes(multiprocessing, (
1891 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1892 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1893 'RawArray', 'current_process', 'active_children', 'Pipe',
1894 'connection', 'JoinableQueue'
1895 )))
1896
1897testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1898globals().update(testcases_processes)
1899
1900
1901class ManagerMixin(object):
1902 TYPE = 'manager'
1903 Process = multiprocessing.Process
1904 manager = object.__new__(multiprocessing.managers.SyncManager)
1905 locals().update(get_attributes(manager, (
1906 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1907 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1908 'Namespace', 'JoinableQueue'
1909 )))
1910
1911testcases_manager = create_test_cases(ManagerMixin, type='manager')
1912globals().update(testcases_manager)
1913
1914
1915class ThreadsMixin(object):
1916 TYPE = 'threads'
1917 Process = multiprocessing.dummy.Process
1918 locals().update(get_attributes(multiprocessing.dummy, (
1919 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1920 'Condition', 'Event', 'Value', 'Array', 'current_process',
1921 'active_children', 'Pipe', 'connection', 'dict', 'list',
1922 'Namespace', 'JoinableQueue'
1923 )))
1924
1925testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1926globals().update(testcases_threads)
1927
Neal Norwitz0c519b32008-08-25 01:50:24 +00001928class OtherTest(unittest.TestCase):
1929 # TODO: add more tests for deliver/answer challenge.
1930 def test_deliver_challenge_auth_failure(self):
1931 class _FakeConnection(object):
1932 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001933 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001934 def send_bytes(self, data):
1935 pass
1936 self.assertRaises(multiprocessing.AuthenticationError,
1937 multiprocessing.connection.deliver_challenge,
1938 _FakeConnection(), b'abc')
1939
1940 def test_answer_challenge_auth_failure(self):
1941 class _FakeConnection(object):
1942 def __init__(self):
1943 self.count = 0
1944 def recv_bytes(self, size):
1945 self.count += 1
1946 if self.count == 1:
1947 return multiprocessing.connection.CHALLENGE
1948 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001949 return b'something bogus'
1950 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001951 def send_bytes(self, data):
1952 pass
1953 self.assertRaises(multiprocessing.AuthenticationError,
1954 multiprocessing.connection.answer_challenge,
1955 _FakeConnection(), b'abc')
1956
Jesse Noller7152f6d2009-04-02 05:17:26 +00001957#
1958# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1959#
1960
1961def initializer(ns):
1962 ns.test += 1
1963
1964class TestInitializers(unittest.TestCase):
1965 def setUp(self):
1966 self.mgr = multiprocessing.Manager()
1967 self.ns = self.mgr.Namespace()
1968 self.ns.test = 0
1969
1970 def tearDown(self):
1971 self.mgr.shutdown()
1972
1973 def test_manager_initializer(self):
1974 m = multiprocessing.managers.SyncManager()
1975 self.assertRaises(TypeError, m.start, 1)
1976 m.start(initializer, (self.ns,))
1977 self.assertEqual(self.ns.test, 1)
1978 m.shutdown()
1979
1980 def test_pool_initializer(self):
1981 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1982 p = multiprocessing.Pool(1, initializer, (self.ns,))
1983 p.close()
1984 p.join()
1985 self.assertEqual(self.ns.test, 1)
1986
Jesse Noller1b90efb2009-06-30 17:11:52 +00001987#
1988# Issue 5155, 5313, 5331: Test process in processes
1989# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1990#
1991
1992def _ThisSubProcess(q):
1993 try:
1994 item = q.get(block=False)
1995 except Queue.Empty:
1996 pass
1997
1998def _TestProcess(q):
1999 queue = multiprocessing.Queue()
2000 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2001 subProc.start()
2002 subProc.join()
2003
2004def _afunc(x):
2005 return x*x
2006
2007def pool_in_process():
2008 pool = multiprocessing.Pool(processes=4)
2009 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2010
2011class _file_like(object):
2012 def __init__(self, delegate):
2013 self._delegate = delegate
2014 self._pid = None
2015
2016 @property
2017 def cache(self):
2018 pid = os.getpid()
2019 # There are no race conditions since fork keeps only the running thread
2020 if pid != self._pid:
2021 self._pid = pid
2022 self._cache = []
2023 return self._cache
2024
2025 def write(self, data):
2026 self.cache.append(data)
2027
2028 def flush(self):
2029 self._delegate.write(''.join(self.cache))
2030 self._cache = []
2031
2032class TestStdinBadfiledescriptor(unittest.TestCase):
2033
2034 def test_queue_in_process(self):
2035 queue = multiprocessing.Queue()
2036 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2037 proc.start()
2038 proc.join()
2039
2040 def test_pool_in_process(self):
2041 p = multiprocessing.Process(target=pool_in_process)
2042 p.start()
2043 p.join()
2044
2045 def test_flushing(self):
2046 sio = StringIO()
2047 flike = _file_like(sio)
2048 flike.write('foo')
2049 proc = multiprocessing.Process(target=lambda: flike.flush())
2050 flike.flush()
2051 assert sio.getvalue() == 'foo'
2052
2053testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2054 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002055
Benjamin Petersondfd79492008-06-13 19:13:39 +00002056#
2057#
2058#
2059
2060def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002061 if sys.platform.startswith("linux"):
2062 try:
2063 lock = multiprocessing.RLock()
2064 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002065 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002066
Benjamin Petersondfd79492008-06-13 19:13:39 +00002067 if run is None:
2068 from test.test_support import run_unittest as run
2069
2070 util.get_temp_dir() # creates temp directory for use by all processes
2071
2072 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2073
Jesse Noller146b7ab2008-07-02 16:44:09 +00002074 ProcessesMixin.pool = multiprocessing.Pool(4)
2075 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2076 ManagerMixin.manager.__init__()
2077 ManagerMixin.manager.start()
2078 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002079
2080 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002081 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2082 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002083 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2084 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002085 )
2086
2087 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2088 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002089 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2090 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002091 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002092 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002093 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002094 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2095 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2096 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002097 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002098
Jesse Noller146b7ab2008-07-02 16:44:09 +00002099 ThreadsMixin.pool.terminate()
2100 ProcessesMixin.pool.terminate()
2101 ManagerMixin.pool.terminate()
2102 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002103
Jesse Noller146b7ab2008-07-02 16:44:09 +00002104 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002105
2106def main():
2107 test_main(unittest.TextTestRunner(verbosity=2).run)
2108
2109if __name__ == '__main__':
2110 main()