blob: 0df2f7803bbeaa94f7491d54dde8468361a56909 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
Brian Curtina06e9b82010-10-07 02:27:41 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44#
45#
46
Benjamin Petersone79edf52008-07-13 18:34:58 +000047latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000054#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000055
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersondfd79492008-06-13 19:13:39 +000071#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000072# Some tests require ctypes
73#
74
75try:
Nick Coghlan13623662010-04-10 14:24:36 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersondfd79492008-06-13 19:13:39 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersondfd79492008-06-13 19:13:39 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155
156 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000158 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 q.put(args)
167 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000169 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000182 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000186 self.assertEqual(p.authkey, current.authkey)
187 self.assertEqual(p.is_alive(), False)
188 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000189 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192
193 p.start()
194
Ezio Melotti2623a372010-11-21 13:34:58 +0000195 self.assertEqual(p.exitcode, None)
196 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000197 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198
Ezio Melotti2623a372010-11-21 13:34:58 +0000199 self.assertEqual(q.get(), args[1:])
200 self.assertEqual(q.get(), kwargs)
201 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000203 self.assertEqual(q.get(), current.authkey)
204 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205
206 p.join()
207
Ezio Melotti2623a372010-11-21 13:34:58 +0000208 self.assertEqual(p.exitcode, 0)
209 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000211
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000225 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000235 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 p.join()
238
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255
256 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000260 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000369 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(Queue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(Queue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(Queue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000420 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000436 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(Queue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(Queue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(Queue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(Queue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in xrange(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in xrange(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Noller82eb5902009-03-30 23:29:31 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000666 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000670 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000712 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000717 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in xrange(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in xrange(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000735 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000739 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in xrange(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
Ezio Melottic2077b02011-03-16 12:34:31 +0200783 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000784 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000809 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000810
811#
812#
813#
814
815class _TestValue(BaseTestCase):
816
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000817 ALLOWED_TYPES = ('processes',)
818
Benjamin Petersondfd79492008-06-13 19:13:39 +0000819 codes_values = [
820 ('i', 4343, 24234),
821 ('d', 3.625, -4.25),
822 ('h', -232, 234),
823 ('c', latin('x'), latin('y'))
824 ]
825
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000826 def setUp(self):
827 if not HAS_SHAREDCTYPES:
828 self.skipTest("requires multiprocessing.sharedctypes")
829
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000830 @classmethod
831 def _test(cls, values):
832 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 sv.value = cv[2]
834
835
836 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000837 if raw:
838 values = [self.RawValue(code, value)
839 for code, value, _ in self.codes_values]
840 else:
841 values = [self.Value(code, value)
842 for code, value, _ in self.codes_values]
843
844 for sv, cv in zip(values, self.codes_values):
845 self.assertEqual(sv.value, cv[1])
846
847 proc = self.Process(target=self._test, args=(values,))
848 proc.start()
849 proc.join()
850
851 for sv, cv in zip(values, self.codes_values):
852 self.assertEqual(sv.value, cv[2])
853
854 def test_rawvalue(self):
855 self.test_value(raw=True)
856
857 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 val1 = self.Value('i', 5)
859 lock1 = val1.get_lock()
860 obj1 = val1.get_obj()
861
862 val2 = self.Value('i', 5, lock=None)
863 lock2 = val2.get_lock()
864 obj2 = val2.get_obj()
865
866 lock = self.Lock()
867 val3 = self.Value('i', 5, lock=lock)
868 lock3 = val3.get_lock()
869 obj3 = val3.get_obj()
870 self.assertEqual(lock, lock3)
871
Jesse Noller6ab22152009-01-18 02:45:38 +0000872 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873 self.assertFalse(hasattr(arr4, 'get_lock'))
874 self.assertFalse(hasattr(arr4, 'get_obj'))
875
Jesse Noller6ab22152009-01-18 02:45:38 +0000876 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
877
878 arr5 = self.RawValue('i', 5)
879 self.assertFalse(hasattr(arr5, 'get_lock'))
880 self.assertFalse(hasattr(arr5, 'get_obj'))
881
Benjamin Petersondfd79492008-06-13 19:13:39 +0000882
883class _TestArray(BaseTestCase):
884
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000885 ALLOWED_TYPES = ('processes',)
886
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000887 @classmethod
888 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 for i in range(1, len(seq)):
890 seq[i] += seq[i-1]
891
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000893 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000894 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
895 if raw:
896 arr = self.RawArray('i', seq)
897 else:
898 arr = self.Array('i', seq)
899
900 self.assertEqual(len(arr), len(seq))
901 self.assertEqual(arr[3], seq[3])
902 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
903
904 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
905
906 self.assertEqual(list(arr[:]), seq)
907
908 self.f(seq)
909
910 p = self.Process(target=self.f, args=(arr,))
911 p.start()
912 p.join()
913
914 self.assertEqual(list(arr[:]), seq)
915
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000917 def test_array_from_size(self):
918 size = 10
919 # Test for zeroing (see issue #11675).
920 # The repetition below strengthens the test by increasing the chances
921 # of previously allocated non-zero memory being used for the new array
922 # on the 2nd and 3rd loops.
923 for _ in range(3):
924 arr = self.Array('i', size)
925 self.assertEqual(len(arr), size)
926 self.assertEqual(list(arr), [0] * size)
927 arr[:] = range(10)
928 self.assertEqual(list(arr), range(10))
929 del arr
930
931 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000932 def test_rawarray(self):
933 self.test_array(raw=True)
934
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000935 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000936 def test_array_accepts_long(self):
937 arr = self.Array('i', 10L)
938 self.assertEqual(len(arr), 10)
939 raw_arr = self.RawArray('i', 10L)
940 self.assertEqual(len(raw_arr), 10)
941
942 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000943 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 arr1 = self.Array('i', range(10))
945 lock1 = arr1.get_lock()
946 obj1 = arr1.get_obj()
947
948 arr2 = self.Array('i', range(10), lock=None)
949 lock2 = arr2.get_lock()
950 obj2 = arr2.get_obj()
951
952 lock = self.Lock()
953 arr3 = self.Array('i', range(10), lock=lock)
954 lock3 = arr3.get_lock()
955 obj3 = arr3.get_obj()
956 self.assertEqual(lock, lock3)
957
Jesse Noller6ab22152009-01-18 02:45:38 +0000958 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000959 self.assertFalse(hasattr(arr4, 'get_lock'))
960 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000961 self.assertRaises(AttributeError,
962 self.Array, 'i', range(10), lock='notalock')
963
964 arr5 = self.RawArray('i', range(10))
965 self.assertFalse(hasattr(arr5, 'get_lock'))
966 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000967
968#
969#
970#
971
972class _TestContainers(BaseTestCase):
973
974 ALLOWED_TYPES = ('manager',)
975
976 def test_list(self):
977 a = self.list(range(10))
978 self.assertEqual(a[:], range(10))
979
980 b = self.list()
981 self.assertEqual(b[:], [])
982
983 b.extend(range(5))
984 self.assertEqual(b[:], range(5))
985
986 self.assertEqual(b[2], 2)
987 self.assertEqual(b[2:10], [2,3,4])
988
989 b *= 2
990 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
991
992 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
993
994 self.assertEqual(a[:], range(10))
995
996 d = [a, b]
997 e = self.list(d)
998 self.assertEqual(
999 e[:],
1000 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1001 )
1002
1003 f = self.list([a])
1004 a.append('hello')
1005 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1006
1007 def test_dict(self):
1008 d = self.dict()
1009 indices = range(65, 70)
1010 for i in indices:
1011 d[i] = chr(i)
1012 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1013 self.assertEqual(sorted(d.keys()), indices)
1014 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1015 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1016
1017 def test_namespace(self):
1018 n = self.Namespace()
1019 n.name = 'Bob'
1020 n.job = 'Builder'
1021 n._hidden = 'hidden'
1022 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1023 del n.job
1024 self.assertEqual(str(n), "Namespace(name='Bob')")
1025 self.assertTrue(hasattr(n, 'name'))
1026 self.assertTrue(not hasattr(n, 'job'))
1027
1028#
1029#
1030#
1031
1032def sqr(x, wait=0.0):
1033 time.sleep(wait)
1034 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001035class _TestPool(BaseTestCase):
1036
1037 def test_apply(self):
1038 papply = self.pool.apply
1039 self.assertEqual(papply(sqr, (5,)), sqr(5))
1040 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1041
1042 def test_map(self):
1043 pmap = self.pool.map
1044 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1045 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1046 map(sqr, range(100)))
1047
Jesse Noller7530e472009-07-16 14:23:04 +00001048 def test_map_chunksize(self):
1049 try:
1050 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1051 except multiprocessing.TimeoutError:
1052 self.fail("pool.map_async with chunksize stalled on null list")
1053
Benjamin Petersondfd79492008-06-13 19:13:39 +00001054 def test_async(self):
1055 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1056 get = TimingWrapper(res.get)
1057 self.assertEqual(get(), 49)
1058 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1059
1060 def test_async_timeout(self):
1061 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1062 get = TimingWrapper(res.get)
1063 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1064 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1065
1066 def test_imap(self):
1067 it = self.pool.imap(sqr, range(10))
1068 self.assertEqual(list(it), map(sqr, range(10)))
1069
1070 it = self.pool.imap(sqr, range(10))
1071 for i in range(10):
1072 self.assertEqual(it.next(), i*i)
1073 self.assertRaises(StopIteration, it.next)
1074
1075 it = self.pool.imap(sqr, range(1000), chunksize=100)
1076 for i in range(1000):
1077 self.assertEqual(it.next(), i*i)
1078 self.assertRaises(StopIteration, it.next)
1079
1080 def test_imap_unordered(self):
1081 it = self.pool.imap_unordered(sqr, range(1000))
1082 self.assertEqual(sorted(it), map(sqr, range(1000)))
1083
1084 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1085 self.assertEqual(sorted(it), map(sqr, range(1000)))
1086
1087 def test_make_pool(self):
1088 p = multiprocessing.Pool(3)
1089 self.assertEqual(3, len(p._pool))
1090 p.close()
1091 p.join()
1092
1093 def test_terminate(self):
1094 if self.TYPE == 'manager':
1095 # On Unix a forked process increfs each shared object to
1096 # which its parent process held a reference. If the
1097 # forked process gets terminated then there is likely to
1098 # be a reference leak. So to prevent
1099 # _TestZZZNumberOfObjects from failing we skip this test
1100 # when using a manager.
1101 return
1102
1103 result = self.pool.map_async(
1104 time.sleep, [0.1 for i in range(10000)], chunksize=1
1105 )
1106 self.pool.terminate()
1107 join = TimingWrapper(self.pool.join)
1108 join()
1109 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001110
1111class _TestPoolWorkerLifetime(BaseTestCase):
1112
1113 ALLOWED_TYPES = ('processes', )
1114 def test_pool_worker_lifetime(self):
1115 p = multiprocessing.Pool(3, maxtasksperchild=10)
1116 self.assertEqual(3, len(p._pool))
1117 origworkerpids = [w.pid for w in p._pool]
1118 # Run many tasks so each worker gets replaced (hopefully)
1119 results = []
1120 for i in range(100):
1121 results.append(p.apply_async(sqr, (i, )))
1122 # Fetch the results and verify we got the right answers,
1123 # also ensuring all the tasks have completed.
1124 for (j, res) in enumerate(results):
1125 self.assertEqual(res.get(), sqr(j))
1126 # Refill the pool
1127 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001128 # Wait until all workers are alive
1129 countdown = 5
1130 while countdown and not all(w.is_alive() for w in p._pool):
1131 countdown -= 1
1132 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001133 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001134 # All pids should be assigned. See issue #7805.
1135 self.assertNotIn(None, origworkerpids)
1136 self.assertNotIn(None, finalworkerpids)
1137 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001138 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1139 p.close()
1140 p.join()
1141
Benjamin Petersondfd79492008-06-13 19:13:39 +00001142#
1143# Test that manager has expected number of shared objects left
1144#
1145
1146class _TestZZZNumberOfObjects(BaseTestCase):
1147 # Because test cases are sorted alphabetically, this one will get
1148 # run after all the other tests for the manager. It tests that
1149 # there have been no "reference leaks" for the manager's shared
1150 # objects. Note the comment in _TestPool.test_terminate().
1151 ALLOWED_TYPES = ('manager',)
1152
1153 def test_number_of_objects(self):
1154 EXPECTED_NUMBER = 1 # the pool object is still alive
1155 multiprocessing.active_children() # discard dead process objs
1156 gc.collect() # do garbage collection
1157 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001158 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001159 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001160 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001161 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001162
1163 self.assertEqual(refs, EXPECTED_NUMBER)
1164
1165#
1166# Test of creating a customized manager class
1167#
1168
1169from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1170
1171class FooBar(object):
1172 def f(self):
1173 return 'f()'
1174 def g(self):
1175 raise ValueError
1176 def _h(self):
1177 return '_h()'
1178
1179def baz():
1180 for i in xrange(10):
1181 yield i*i
1182
1183class IteratorProxy(BaseProxy):
1184 _exposed_ = ('next', '__next__')
1185 def __iter__(self):
1186 return self
1187 def next(self):
1188 return self._callmethod('next')
1189 def __next__(self):
1190 return self._callmethod('__next__')
1191
1192class MyManager(BaseManager):
1193 pass
1194
1195MyManager.register('Foo', callable=FooBar)
1196MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1197MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1198
1199
1200class _TestMyManager(BaseTestCase):
1201
1202 ALLOWED_TYPES = ('manager',)
1203
1204 def test_mymanager(self):
1205 manager = MyManager()
1206 manager.start()
1207
1208 foo = manager.Foo()
1209 bar = manager.Bar()
1210 baz = manager.baz()
1211
1212 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1213 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1214
1215 self.assertEqual(foo_methods, ['f', 'g'])
1216 self.assertEqual(bar_methods, ['f', '_h'])
1217
1218 self.assertEqual(foo.f(), 'f()')
1219 self.assertRaises(ValueError, foo.g)
1220 self.assertEqual(foo._callmethod('f'), 'f()')
1221 self.assertRaises(RemoteError, foo._callmethod, '_h')
1222
1223 self.assertEqual(bar.f(), 'f()')
1224 self.assertEqual(bar._h(), '_h()')
1225 self.assertEqual(bar._callmethod('f'), 'f()')
1226 self.assertEqual(bar._callmethod('_h'), '_h()')
1227
1228 self.assertEqual(list(baz), [i*i for i in range(10)])
1229
1230 manager.shutdown()
1231
1232#
1233# Test of connecting to a remote server and using xmlrpclib for serialization
1234#
1235
1236_queue = Queue.Queue()
1237def get_queue():
1238 return _queue
1239
1240class QueueManager(BaseManager):
1241 '''manager class used by server process'''
1242QueueManager.register('get_queue', callable=get_queue)
1243
1244class QueueManager2(BaseManager):
1245 '''manager class which specifies the same interface as QueueManager'''
1246QueueManager2.register('get_queue')
1247
1248
1249SERIALIZER = 'xmlrpclib'
1250
1251class _TestRemoteManager(BaseTestCase):
1252
1253 ALLOWED_TYPES = ('manager',)
1254
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001255 @classmethod
1256 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001257 manager = QueueManager2(
1258 address=address, authkey=authkey, serializer=SERIALIZER
1259 )
1260 manager.connect()
1261 queue = manager.get_queue()
1262 queue.put(('hello world', None, True, 2.25))
1263
1264 def test_remote(self):
1265 authkey = os.urandom(32)
1266
1267 manager = QueueManager(
1268 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1269 )
1270 manager.start()
1271
1272 p = self.Process(target=self._putter, args=(manager.address, authkey))
1273 p.start()
1274
1275 manager2 = QueueManager2(
1276 address=manager.address, authkey=authkey, serializer=SERIALIZER
1277 )
1278 manager2.connect()
1279 queue = manager2.get_queue()
1280
1281 # Note that xmlrpclib will deserialize object as a list not a tuple
1282 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1283
1284 # Because we are using xmlrpclib for serialization instead of
1285 # pickle this will cause a serialization error.
1286 self.assertRaises(Exception, queue.put, time.sleep)
1287
1288 # Make queue finalizer run before the server is stopped
1289 del queue
1290 manager.shutdown()
1291
Jesse Noller459a6482009-03-30 15:50:42 +00001292class _TestManagerRestart(BaseTestCase):
1293
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001294 @classmethod
1295 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001296 manager = QueueManager(
1297 address=address, authkey=authkey, serializer=SERIALIZER)
1298 manager.connect()
1299 queue = manager.get_queue()
1300 queue.put('hello world')
1301
1302 def test_rapid_restart(self):
1303 authkey = os.urandom(32)
1304 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001305 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001306 srvr = manager.get_server()
1307 addr = srvr.address
1308 # Close the connection.Listener socket which gets opened as a part
1309 # of manager.get_server(). It's not needed for the test.
1310 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001311 manager.start()
1312
1313 p = self.Process(target=self._putter, args=(manager.address, authkey))
1314 p.start()
1315 queue = manager.get_queue()
1316 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001317 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001318 manager.shutdown()
1319 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001320 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001321 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001322 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001323
Benjamin Petersondfd79492008-06-13 19:13:39 +00001324#
1325#
1326#
1327
1328SENTINEL = latin('')
1329
1330class _TestConnection(BaseTestCase):
1331
1332 ALLOWED_TYPES = ('processes', 'threads')
1333
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001334 @classmethod
1335 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001336 for msg in iter(conn.recv_bytes, SENTINEL):
1337 conn.send_bytes(msg)
1338 conn.close()
1339
1340 def test_connection(self):
1341 conn, child_conn = self.Pipe()
1342
1343 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001344 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001345 p.start()
1346
1347 seq = [1, 2.25, None]
1348 msg = latin('hello world')
1349 longmsg = msg * 10
1350 arr = array.array('i', range(4))
1351
1352 if self.TYPE == 'processes':
1353 self.assertEqual(type(conn.fileno()), int)
1354
1355 self.assertEqual(conn.send(seq), None)
1356 self.assertEqual(conn.recv(), seq)
1357
1358 self.assertEqual(conn.send_bytes(msg), None)
1359 self.assertEqual(conn.recv_bytes(), msg)
1360
1361 if self.TYPE == 'processes':
1362 buffer = array.array('i', [0]*10)
1363 expected = list(arr) + [0] * (10 - len(arr))
1364 self.assertEqual(conn.send_bytes(arr), None)
1365 self.assertEqual(conn.recv_bytes_into(buffer),
1366 len(arr) * buffer.itemsize)
1367 self.assertEqual(list(buffer), expected)
1368
1369 buffer = array.array('i', [0]*10)
1370 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1371 self.assertEqual(conn.send_bytes(arr), None)
1372 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1373 len(arr) * buffer.itemsize)
1374 self.assertEqual(list(buffer), expected)
1375
1376 buffer = bytearray(latin(' ' * 40))
1377 self.assertEqual(conn.send_bytes(longmsg), None)
1378 try:
1379 res = conn.recv_bytes_into(buffer)
1380 except multiprocessing.BufferTooShort, e:
1381 self.assertEqual(e.args, (longmsg,))
1382 else:
1383 self.fail('expected BufferTooShort, got %s' % res)
1384
1385 poll = TimingWrapper(conn.poll)
1386
1387 self.assertEqual(poll(), False)
1388 self.assertTimingAlmostEqual(poll.elapsed, 0)
1389
1390 self.assertEqual(poll(TIMEOUT1), False)
1391 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1392
1393 conn.send(None)
1394
1395 self.assertEqual(poll(TIMEOUT1), True)
1396 self.assertTimingAlmostEqual(poll.elapsed, 0)
1397
1398 self.assertEqual(conn.recv(), None)
1399
1400 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1401 conn.send_bytes(really_big_msg)
1402 self.assertEqual(conn.recv_bytes(), really_big_msg)
1403
1404 conn.send_bytes(SENTINEL) # tell child to quit
1405 child_conn.close()
1406
1407 if self.TYPE == 'processes':
1408 self.assertEqual(conn.readable, True)
1409 self.assertEqual(conn.writable, True)
1410 self.assertRaises(EOFError, conn.recv)
1411 self.assertRaises(EOFError, conn.recv_bytes)
1412
1413 p.join()
1414
1415 def test_duplex_false(self):
1416 reader, writer = self.Pipe(duplex=False)
1417 self.assertEqual(writer.send(1), None)
1418 self.assertEqual(reader.recv(), 1)
1419 if self.TYPE == 'processes':
1420 self.assertEqual(reader.readable, True)
1421 self.assertEqual(reader.writable, False)
1422 self.assertEqual(writer.readable, False)
1423 self.assertEqual(writer.writable, True)
1424 self.assertRaises(IOError, reader.send, 2)
1425 self.assertRaises(IOError, writer.recv)
1426 self.assertRaises(IOError, writer.poll)
1427
1428 def test_spawn_close(self):
1429 # We test that a pipe connection can be closed by parent
1430 # process immediately after child is spawned. On Windows this
1431 # would have sometimes failed on old versions because
1432 # child_conn would be closed before the child got a chance to
1433 # duplicate it.
1434 conn, child_conn = self.Pipe()
1435
1436 p = self.Process(target=self._echo, args=(child_conn,))
1437 p.start()
1438 child_conn.close() # this might complete before child initializes
1439
1440 msg = latin('hello')
1441 conn.send_bytes(msg)
1442 self.assertEqual(conn.recv_bytes(), msg)
1443
1444 conn.send_bytes(SENTINEL)
1445 conn.close()
1446 p.join()
1447
1448 def test_sendbytes(self):
1449 if self.TYPE != 'processes':
1450 return
1451
1452 msg = latin('abcdefghijklmnopqrstuvwxyz')
1453 a, b = self.Pipe()
1454
1455 a.send_bytes(msg)
1456 self.assertEqual(b.recv_bytes(), msg)
1457
1458 a.send_bytes(msg, 5)
1459 self.assertEqual(b.recv_bytes(), msg[5:])
1460
1461 a.send_bytes(msg, 7, 8)
1462 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1463
1464 a.send_bytes(msg, 26)
1465 self.assertEqual(b.recv_bytes(), latin(''))
1466
1467 a.send_bytes(msg, 26, 0)
1468 self.assertEqual(b.recv_bytes(), latin(''))
1469
1470 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1471
1472 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1473
1474 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1475
1476 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1477
1478 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1479
Benjamin Petersondfd79492008-06-13 19:13:39 +00001480class _TestListenerClient(BaseTestCase):
1481
1482 ALLOWED_TYPES = ('processes', 'threads')
1483
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001484 @classmethod
1485 def _test(cls, address):
1486 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001487 conn.send('hello')
1488 conn.close()
1489
1490 def test_listener_client(self):
1491 for family in self.connection.families:
1492 l = self.connection.Listener(family=family)
1493 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001494 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001495 p.start()
1496 conn = l.accept()
1497 self.assertEqual(conn.recv(), 'hello')
1498 p.join()
1499 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001500#
1501# Test of sending connection and socket objects between processes
1502#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001503"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001504class _TestPicklingConnections(BaseTestCase):
1505
1506 ALLOWED_TYPES = ('processes',)
1507
1508 def _listener(self, conn, families):
1509 for fam in families:
1510 l = self.connection.Listener(family=fam)
1511 conn.send(l.address)
1512 new_conn = l.accept()
1513 conn.send(new_conn)
1514
1515 if self.TYPE == 'processes':
1516 l = socket.socket()
1517 l.bind(('localhost', 0))
1518 conn.send(l.getsockname())
1519 l.listen(1)
1520 new_conn, addr = l.accept()
1521 conn.send(new_conn)
1522
1523 conn.recv()
1524
1525 def _remote(self, conn):
1526 for (address, msg) in iter(conn.recv, None):
1527 client = self.connection.Client(address)
1528 client.send(msg.upper())
1529 client.close()
1530
1531 if self.TYPE == 'processes':
1532 address, msg = conn.recv()
1533 client = socket.socket()
1534 client.connect(address)
1535 client.sendall(msg.upper())
1536 client.close()
1537
1538 conn.close()
1539
1540 def test_pickling(self):
1541 try:
1542 multiprocessing.allow_connection_pickling()
1543 except ImportError:
1544 return
1545
1546 families = self.connection.families
1547
1548 lconn, lconn0 = self.Pipe()
1549 lp = self.Process(target=self._listener, args=(lconn0, families))
1550 lp.start()
1551 lconn0.close()
1552
1553 rconn, rconn0 = self.Pipe()
1554 rp = self.Process(target=self._remote, args=(rconn0,))
1555 rp.start()
1556 rconn0.close()
1557
1558 for fam in families:
1559 msg = ('This connection uses family %s' % fam).encode('ascii')
1560 address = lconn.recv()
1561 rconn.send((address, msg))
1562 new_conn = lconn.recv()
1563 self.assertEqual(new_conn.recv(), msg.upper())
1564
1565 rconn.send(None)
1566
1567 if self.TYPE == 'processes':
1568 msg = latin('This connection uses a normal socket')
1569 address = lconn.recv()
1570 rconn.send((address, msg))
1571 if hasattr(socket, 'fromfd'):
1572 new_conn = lconn.recv()
1573 self.assertEqual(new_conn.recv(100), msg.upper())
1574 else:
1575 # XXX On Windows with Py2.6 need to backport fromfd()
1576 discard = lconn.recv_bytes()
1577
1578 lconn.send(None)
1579
1580 rconn.close()
1581 lconn.close()
1582
1583 lp.join()
1584 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001585"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001586#
1587#
1588#
1589
1590class _TestHeap(BaseTestCase):
1591
1592 ALLOWED_TYPES = ('processes',)
1593
1594 def test_heap(self):
1595 iterations = 5000
1596 maxblocks = 50
1597 blocks = []
1598
1599 # create and destroy lots of blocks of different sizes
1600 for i in xrange(iterations):
1601 size = int(random.lognormvariate(0, 1) * 1000)
1602 b = multiprocessing.heap.BufferWrapper(size)
1603 blocks.append(b)
1604 if len(blocks) > maxblocks:
1605 i = random.randrange(maxblocks)
1606 del blocks[i]
1607
1608 # get the heap object
1609 heap = multiprocessing.heap.BufferWrapper._heap
1610
1611 # verify the state of the heap
1612 all = []
1613 occupied = 0
1614 for L in heap._len_to_seq.values():
1615 for arena, start, stop in L:
1616 all.append((heap._arenas.index(arena), start, stop,
1617 stop-start, 'free'))
1618 for arena, start, stop in heap._allocated_blocks:
1619 all.append((heap._arenas.index(arena), start, stop,
1620 stop-start, 'occupied'))
1621 occupied += (stop-start)
1622
1623 all.sort()
1624
1625 for i in range(len(all)-1):
1626 (arena, start, stop) = all[i][:3]
1627 (narena, nstart, nstop) = all[i+1][:3]
1628 self.assertTrue((arena != narena and nstart == 0) or
1629 (stop == nstart))
1630
1631#
1632#
1633#
1634
Benjamin Petersondfd79492008-06-13 19:13:39 +00001635class _Foo(Structure):
1636 _fields_ = [
1637 ('x', c_int),
1638 ('y', c_double)
1639 ]
1640
1641class _TestSharedCTypes(BaseTestCase):
1642
1643 ALLOWED_TYPES = ('processes',)
1644
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001645 def setUp(self):
1646 if not HAS_SHAREDCTYPES:
1647 self.skipTest("requires multiprocessing.sharedctypes")
1648
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001649 @classmethod
1650 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001651 x.value *= 2
1652 y.value *= 2
1653 foo.x *= 2
1654 foo.y *= 2
1655 string.value *= 2
1656 for i in range(len(arr)):
1657 arr[i] *= 2
1658
1659 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001660 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001661 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001662 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001663 arr = self.Array('d', range(10), lock=lock)
1664 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001665 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001666
1667 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1668 p.start()
1669 p.join()
1670
1671 self.assertEqual(x.value, 14)
1672 self.assertAlmostEqual(y.value, 2.0/3.0)
1673 self.assertEqual(foo.x, 6)
1674 self.assertAlmostEqual(foo.y, 4.0)
1675 for i in range(10):
1676 self.assertAlmostEqual(arr[i], i*2)
1677 self.assertEqual(string.value, latin('hellohello'))
1678
1679 def test_synchronize(self):
1680 self.test_sharedctypes(lock=True)
1681
1682 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001683 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001684 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001685 foo.x = 0
1686 foo.y = 0
1687 self.assertEqual(bar.x, 2)
1688 self.assertAlmostEqual(bar.y, 5.0)
1689
1690#
1691#
1692#
1693
1694class _TestFinalize(BaseTestCase):
1695
1696 ALLOWED_TYPES = ('processes',)
1697
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001698 @classmethod
1699 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001700 class Foo(object):
1701 pass
1702
1703 a = Foo()
1704 util.Finalize(a, conn.send, args=('a',))
1705 del a # triggers callback for a
1706
1707 b = Foo()
1708 close_b = util.Finalize(b, conn.send, args=('b',))
1709 close_b() # triggers callback for b
1710 close_b() # does nothing because callback has already been called
1711 del b # does nothing because callback has already been called
1712
1713 c = Foo()
1714 util.Finalize(c, conn.send, args=('c',))
1715
1716 d10 = Foo()
1717 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1718
1719 d01 = Foo()
1720 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1721 d02 = Foo()
1722 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1723 d03 = Foo()
1724 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1725
1726 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1727
1728 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1729
Ezio Melottic2077b02011-03-16 12:34:31 +02001730 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001731 # garbage collecting locals
1732 util._exit_function()
1733 conn.close()
1734 os._exit(0)
1735
1736 def test_finalize(self):
1737 conn, child_conn = self.Pipe()
1738
1739 p = self.Process(target=self._test_finalize, args=(child_conn,))
1740 p.start()
1741 p.join()
1742
1743 result = [obj for obj in iter(conn.recv, 'STOP')]
1744 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1745
1746#
1747# Test that from ... import * works for each module
1748#
1749
1750class _TestImportStar(BaseTestCase):
1751
1752 ALLOWED_TYPES = ('processes',)
1753
1754 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001755 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001756 'multiprocessing', 'multiprocessing.connection',
1757 'multiprocessing.heap', 'multiprocessing.managers',
1758 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001759 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001760 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001761 ]
1762
1763 if c_int is not None:
1764 # This module requires _ctypes
1765 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001766
1767 for name in modules:
1768 __import__(name)
1769 mod = sys.modules[name]
1770
1771 for attr in getattr(mod, '__all__', ()):
1772 self.assertTrue(
1773 hasattr(mod, attr),
1774 '%r does not have attribute %r' % (mod, attr)
1775 )
1776
1777#
1778# Quick test that logging works -- does not test logging output
1779#
1780
1781class _TestLogging(BaseTestCase):
1782
1783 ALLOWED_TYPES = ('processes',)
1784
1785 def test_enable_logging(self):
1786 logger = multiprocessing.get_logger()
1787 logger.setLevel(util.SUBWARNING)
1788 self.assertTrue(logger is not None)
1789 logger.debug('this will not be printed')
1790 logger.info('nor will this')
1791 logger.setLevel(LOG_LEVEL)
1792
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001793 @classmethod
1794 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001795 logger = multiprocessing.get_logger()
1796 conn.send(logger.getEffectiveLevel())
1797
1798 def test_level(self):
1799 LEVEL1 = 32
1800 LEVEL2 = 37
1801
1802 logger = multiprocessing.get_logger()
1803 root_logger = logging.getLogger()
1804 root_level = root_logger.level
1805
1806 reader, writer = multiprocessing.Pipe(duplex=False)
1807
1808 logger.setLevel(LEVEL1)
1809 self.Process(target=self._test_level, args=(writer,)).start()
1810 self.assertEqual(LEVEL1, reader.recv())
1811
1812 logger.setLevel(logging.NOTSET)
1813 root_logger.setLevel(LEVEL2)
1814 self.Process(target=self._test_level, args=(writer,)).start()
1815 self.assertEqual(LEVEL2, reader.recv())
1816
1817 root_logger.setLevel(root_level)
1818 logger.setLevel(level=LOG_LEVEL)
1819
Jesse Noller814d02d2009-11-21 14:38:23 +00001820
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001821# class _TestLoggingProcessName(BaseTestCase):
1822#
1823# def handle(self, record):
1824# assert record.processName == multiprocessing.current_process().name
1825# self.__handled = True
1826#
1827# def test_logging(self):
1828# handler = logging.Handler()
1829# handler.handle = self.handle
1830# self.__handled = False
1831# # Bypass getLogger() and side-effects
1832# logger = logging.getLoggerClass()(
1833# 'multiprocessing.test.TestLoggingProcessName')
1834# logger.addHandler(handler)
1835# logger.propagate = False
1836#
1837# logger.warn('foo')
1838# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001839
Benjamin Petersondfd79492008-06-13 19:13:39 +00001840#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001841# Test to verify handle verification, see issue 3321
1842#
1843
1844class TestInvalidHandle(unittest.TestCase):
1845
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001846 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001847 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001848 conn = _multiprocessing.Connection(44977608)
1849 self.assertRaises(IOError, conn.poll)
1850 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001851
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001852#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001853# Functions used to create test cases from the base ones in this module
1854#
1855
1856def get_attributes(Source, names):
1857 d = {}
1858 for name in names:
1859 obj = getattr(Source, name)
1860 if type(obj) == type(get_attributes):
1861 obj = staticmethod(obj)
1862 d[name] = obj
1863 return d
1864
1865def create_test_cases(Mixin, type):
1866 result = {}
1867 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001868 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001869
1870 for name in glob.keys():
1871 if name.startswith('_Test'):
1872 base = glob[name]
1873 if type in base.ALLOWED_TYPES:
1874 newname = 'With' + Type + name[1:]
1875 class Temp(base, unittest.TestCase, Mixin):
1876 pass
1877 result[newname] = Temp
1878 Temp.__name__ = newname
1879 Temp.__module__ = Mixin.__module__
1880 return result
1881
1882#
1883# Create test cases
1884#
1885
1886class ProcessesMixin(object):
1887 TYPE = 'processes'
1888 Process = multiprocessing.Process
1889 locals().update(get_attributes(multiprocessing, (
1890 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1891 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1892 'RawArray', 'current_process', 'active_children', 'Pipe',
1893 'connection', 'JoinableQueue'
1894 )))
1895
1896testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1897globals().update(testcases_processes)
1898
1899
1900class ManagerMixin(object):
1901 TYPE = 'manager'
1902 Process = multiprocessing.Process
1903 manager = object.__new__(multiprocessing.managers.SyncManager)
1904 locals().update(get_attributes(manager, (
1905 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1906 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1907 'Namespace', 'JoinableQueue'
1908 )))
1909
1910testcases_manager = create_test_cases(ManagerMixin, type='manager')
1911globals().update(testcases_manager)
1912
1913
1914class ThreadsMixin(object):
1915 TYPE = 'threads'
1916 Process = multiprocessing.dummy.Process
1917 locals().update(get_attributes(multiprocessing.dummy, (
1918 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1919 'Condition', 'Event', 'Value', 'Array', 'current_process',
1920 'active_children', 'Pipe', 'connection', 'dict', 'list',
1921 'Namespace', 'JoinableQueue'
1922 )))
1923
1924testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1925globals().update(testcases_threads)
1926
Neal Norwitz0c519b32008-08-25 01:50:24 +00001927class OtherTest(unittest.TestCase):
1928 # TODO: add more tests for deliver/answer challenge.
1929 def test_deliver_challenge_auth_failure(self):
1930 class _FakeConnection(object):
1931 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001932 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001933 def send_bytes(self, data):
1934 pass
1935 self.assertRaises(multiprocessing.AuthenticationError,
1936 multiprocessing.connection.deliver_challenge,
1937 _FakeConnection(), b'abc')
1938
1939 def test_answer_challenge_auth_failure(self):
1940 class _FakeConnection(object):
1941 def __init__(self):
1942 self.count = 0
1943 def recv_bytes(self, size):
1944 self.count += 1
1945 if self.count == 1:
1946 return multiprocessing.connection.CHALLENGE
1947 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001948 return b'something bogus'
1949 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001950 def send_bytes(self, data):
1951 pass
1952 self.assertRaises(multiprocessing.AuthenticationError,
1953 multiprocessing.connection.answer_challenge,
1954 _FakeConnection(), b'abc')
1955
Jesse Noller7152f6d2009-04-02 05:17:26 +00001956#
1957# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1958#
1959
1960def initializer(ns):
1961 ns.test += 1
1962
1963class TestInitializers(unittest.TestCase):
1964 def setUp(self):
1965 self.mgr = multiprocessing.Manager()
1966 self.ns = self.mgr.Namespace()
1967 self.ns.test = 0
1968
1969 def tearDown(self):
1970 self.mgr.shutdown()
1971
1972 def test_manager_initializer(self):
1973 m = multiprocessing.managers.SyncManager()
1974 self.assertRaises(TypeError, m.start, 1)
1975 m.start(initializer, (self.ns,))
1976 self.assertEqual(self.ns.test, 1)
1977 m.shutdown()
1978
1979 def test_pool_initializer(self):
1980 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1981 p = multiprocessing.Pool(1, initializer, (self.ns,))
1982 p.close()
1983 p.join()
1984 self.assertEqual(self.ns.test, 1)
1985
Jesse Noller1b90efb2009-06-30 17:11:52 +00001986#
1987# Issue 5155, 5313, 5331: Test process in processes
1988# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1989#
1990
1991def _ThisSubProcess(q):
1992 try:
1993 item = q.get(block=False)
1994 except Queue.Empty:
1995 pass
1996
1997def _TestProcess(q):
1998 queue = multiprocessing.Queue()
1999 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2000 subProc.start()
2001 subProc.join()
2002
2003def _afunc(x):
2004 return x*x
2005
2006def pool_in_process():
2007 pool = multiprocessing.Pool(processes=4)
2008 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2009
2010class _file_like(object):
2011 def __init__(self, delegate):
2012 self._delegate = delegate
2013 self._pid = None
2014
2015 @property
2016 def cache(self):
2017 pid = os.getpid()
2018 # There are no race conditions since fork keeps only the running thread
2019 if pid != self._pid:
2020 self._pid = pid
2021 self._cache = []
2022 return self._cache
2023
2024 def write(self, data):
2025 self.cache.append(data)
2026
2027 def flush(self):
2028 self._delegate.write(''.join(self.cache))
2029 self._cache = []
2030
2031class TestStdinBadfiledescriptor(unittest.TestCase):
2032
2033 def test_queue_in_process(self):
2034 queue = multiprocessing.Queue()
2035 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2036 proc.start()
2037 proc.join()
2038
2039 def test_pool_in_process(self):
2040 p = multiprocessing.Process(target=pool_in_process)
2041 p.start()
2042 p.join()
2043
2044 def test_flushing(self):
2045 sio = StringIO()
2046 flike = _file_like(sio)
2047 flike.write('foo')
2048 proc = multiprocessing.Process(target=lambda: flike.flush())
2049 flike.flush()
2050 assert sio.getvalue() == 'foo'
2051
2052testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2053 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002054
Benjamin Petersondfd79492008-06-13 19:13:39 +00002055#
2056#
2057#
2058
2059def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002060 if sys.platform.startswith("linux"):
2061 try:
2062 lock = multiprocessing.RLock()
2063 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002064 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002065
Benjamin Petersondfd79492008-06-13 19:13:39 +00002066 if run is None:
2067 from test.test_support import run_unittest as run
2068
2069 util.get_temp_dir() # creates temp directory for use by all processes
2070
2071 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2072
Jesse Noller146b7ab2008-07-02 16:44:09 +00002073 ProcessesMixin.pool = multiprocessing.Pool(4)
2074 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2075 ManagerMixin.manager.__init__()
2076 ManagerMixin.manager.start()
2077 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002078
2079 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002080 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2081 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002082 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2083 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002084 )
2085
2086 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2087 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002088 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2089 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002090 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002091 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002092 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002093 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2094 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2095 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002096 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002097
Jesse Noller146b7ab2008-07-02 16:44:09 +00002098 ThreadsMixin.pool.terminate()
2099 ProcessesMixin.pool.terminate()
2100 ManagerMixin.pool.terminate()
2101 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002102
Jesse Noller146b7ab2008-07-02 16:44:09 +00002103 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002104
2105def main():
2106 test_main(unittest.TextTestRunner(verbosity=2).run)
2107
2108if __name__ == '__main__':
2109 main()