blob: 78b002ef5762a7fb5cb694e23044497b92889454 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Victor Stinner613b4cf2010-04-27 21:56:26 +000021# import threading after _multiprocessing to raise a more revelant error
22# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
Brian Curtina06e9b82010-10-07 02:27:41 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44#
45#
46
Benjamin Petersone79edf52008-07-13 18:34:58 +000047latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000054#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000055
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersondfd79492008-06-13 19:13:39 +000071#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000072# Some tests require ctypes
73#
74
75try:
Nick Coghlan13623662010-04-10 14:24:36 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersondfd79492008-06-13 19:13:39 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersondfd79492008-06-13 19:13:39 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155
156 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000158 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 q.put(args)
167 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000169 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000182 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000186 self.assertEqual(p.authkey, current.authkey)
187 self.assertEqual(p.is_alive(), False)
188 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000189 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192
193 p.start()
194
Ezio Melotti2623a372010-11-21 13:34:58 +0000195 self.assertEqual(p.exitcode, None)
196 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000197 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198
Ezio Melotti2623a372010-11-21 13:34:58 +0000199 self.assertEqual(q.get(), args[1:])
200 self.assertEqual(q.get(), kwargs)
201 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000203 self.assertEqual(q.get(), current.authkey)
204 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205
206 p.join()
207
Ezio Melotti2623a372010-11-21 13:34:58 +0000208 self.assertEqual(p.exitcode, 0)
209 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000211
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000225 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000235 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 p.join()
238
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255
256 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000260 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000369 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(Queue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(Queue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(Queue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000420 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000436 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(Queue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(Queue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(Queue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(Queue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in xrange(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in xrange(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Noller82eb5902009-03-30 23:29:31 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000666 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000670 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000712 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000717 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in xrange(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in xrange(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000735 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000739 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in xrange(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
783 # Removed temporaily, due to API shear, this does not
784 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000809 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000810
811#
812#
813#
814
815class _TestValue(BaseTestCase):
816
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000817 ALLOWED_TYPES = ('processes',)
818
Benjamin Petersondfd79492008-06-13 19:13:39 +0000819 codes_values = [
820 ('i', 4343, 24234),
821 ('d', 3.625, -4.25),
822 ('h', -232, 234),
823 ('c', latin('x'), latin('y'))
824 ]
825
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000826 def setUp(self):
827 if not HAS_SHAREDCTYPES:
828 self.skipTest("requires multiprocessing.sharedctypes")
829
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000830 @classmethod
831 def _test(cls, values):
832 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 sv.value = cv[2]
834
835
836 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000837 if raw:
838 values = [self.RawValue(code, value)
839 for code, value, _ in self.codes_values]
840 else:
841 values = [self.Value(code, value)
842 for code, value, _ in self.codes_values]
843
844 for sv, cv in zip(values, self.codes_values):
845 self.assertEqual(sv.value, cv[1])
846
847 proc = self.Process(target=self._test, args=(values,))
848 proc.start()
849 proc.join()
850
851 for sv, cv in zip(values, self.codes_values):
852 self.assertEqual(sv.value, cv[2])
853
854 def test_rawvalue(self):
855 self.test_value(raw=True)
856
857 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 val1 = self.Value('i', 5)
859 lock1 = val1.get_lock()
860 obj1 = val1.get_obj()
861
862 val2 = self.Value('i', 5, lock=None)
863 lock2 = val2.get_lock()
864 obj2 = val2.get_obj()
865
866 lock = self.Lock()
867 val3 = self.Value('i', 5, lock=lock)
868 lock3 = val3.get_lock()
869 obj3 = val3.get_obj()
870 self.assertEqual(lock, lock3)
871
Jesse Noller6ab22152009-01-18 02:45:38 +0000872 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873 self.assertFalse(hasattr(arr4, 'get_lock'))
874 self.assertFalse(hasattr(arr4, 'get_obj'))
875
Jesse Noller6ab22152009-01-18 02:45:38 +0000876 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
877
878 arr5 = self.RawValue('i', 5)
879 self.assertFalse(hasattr(arr5, 'get_lock'))
880 self.assertFalse(hasattr(arr5, 'get_obj'))
881
Benjamin Petersondfd79492008-06-13 19:13:39 +0000882
883class _TestArray(BaseTestCase):
884
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000885 ALLOWED_TYPES = ('processes',)
886
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000887 @classmethod
888 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 for i in range(1, len(seq)):
890 seq[i] += seq[i-1]
891
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000893 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000894 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
895 if raw:
896 arr = self.RawArray('i', seq)
897 else:
898 arr = self.Array('i', seq)
899
900 self.assertEqual(len(arr), len(seq))
901 self.assertEqual(arr[3], seq[3])
902 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
903
904 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
905
906 self.assertEqual(list(arr[:]), seq)
907
908 self.f(seq)
909
910 p = self.Process(target=self.f, args=(arr,))
911 p.start()
912 p.join()
913
914 self.assertEqual(list(arr[:]), seq)
915
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 def test_rawarray(self):
918 self.test_array(raw=True)
919
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000920 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000921 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000922 arr1 = self.Array('i', range(10))
923 lock1 = arr1.get_lock()
924 obj1 = arr1.get_obj()
925
926 arr2 = self.Array('i', range(10), lock=None)
927 lock2 = arr2.get_lock()
928 obj2 = arr2.get_obj()
929
930 lock = self.Lock()
931 arr3 = self.Array('i', range(10), lock=lock)
932 lock3 = arr3.get_lock()
933 obj3 = arr3.get_obj()
934 self.assertEqual(lock, lock3)
935
Jesse Noller6ab22152009-01-18 02:45:38 +0000936 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000937 self.assertFalse(hasattr(arr4, 'get_lock'))
938 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000939 self.assertRaises(AttributeError,
940 self.Array, 'i', range(10), lock='notalock')
941
942 arr5 = self.RawArray('i', range(10))
943 self.assertFalse(hasattr(arr5, 'get_lock'))
944 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000945
946#
947#
948#
949
950class _TestContainers(BaseTestCase):
951
952 ALLOWED_TYPES = ('manager',)
953
954 def test_list(self):
955 a = self.list(range(10))
956 self.assertEqual(a[:], range(10))
957
958 b = self.list()
959 self.assertEqual(b[:], [])
960
961 b.extend(range(5))
962 self.assertEqual(b[:], range(5))
963
964 self.assertEqual(b[2], 2)
965 self.assertEqual(b[2:10], [2,3,4])
966
967 b *= 2
968 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
969
970 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
971
972 self.assertEqual(a[:], range(10))
973
974 d = [a, b]
975 e = self.list(d)
976 self.assertEqual(
977 e[:],
978 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
979 )
980
981 f = self.list([a])
982 a.append('hello')
983 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
984
985 def test_dict(self):
986 d = self.dict()
987 indices = range(65, 70)
988 for i in indices:
989 d[i] = chr(i)
990 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
991 self.assertEqual(sorted(d.keys()), indices)
992 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
993 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
994
995 def test_namespace(self):
996 n = self.Namespace()
997 n.name = 'Bob'
998 n.job = 'Builder'
999 n._hidden = 'hidden'
1000 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1001 del n.job
1002 self.assertEqual(str(n), "Namespace(name='Bob')")
1003 self.assertTrue(hasattr(n, 'name'))
1004 self.assertTrue(not hasattr(n, 'job'))
1005
1006#
1007#
1008#
1009
1010def sqr(x, wait=0.0):
1011 time.sleep(wait)
1012 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001013class _TestPool(BaseTestCase):
1014
1015 def test_apply(self):
1016 papply = self.pool.apply
1017 self.assertEqual(papply(sqr, (5,)), sqr(5))
1018 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1019
1020 def test_map(self):
1021 pmap = self.pool.map
1022 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1023 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1024 map(sqr, range(100)))
1025
Jesse Noller7530e472009-07-16 14:23:04 +00001026 def test_map_chunksize(self):
1027 try:
1028 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1029 except multiprocessing.TimeoutError:
1030 self.fail("pool.map_async with chunksize stalled on null list")
1031
Benjamin Petersondfd79492008-06-13 19:13:39 +00001032 def test_async(self):
1033 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1034 get = TimingWrapper(res.get)
1035 self.assertEqual(get(), 49)
1036 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1037
1038 def test_async_timeout(self):
1039 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1040 get = TimingWrapper(res.get)
1041 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1042 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1043
1044 def test_imap(self):
1045 it = self.pool.imap(sqr, range(10))
1046 self.assertEqual(list(it), map(sqr, range(10)))
1047
1048 it = self.pool.imap(sqr, range(10))
1049 for i in range(10):
1050 self.assertEqual(it.next(), i*i)
1051 self.assertRaises(StopIteration, it.next)
1052
1053 it = self.pool.imap(sqr, range(1000), chunksize=100)
1054 for i in range(1000):
1055 self.assertEqual(it.next(), i*i)
1056 self.assertRaises(StopIteration, it.next)
1057
1058 def test_imap_unordered(self):
1059 it = self.pool.imap_unordered(sqr, range(1000))
1060 self.assertEqual(sorted(it), map(sqr, range(1000)))
1061
1062 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1063 self.assertEqual(sorted(it), map(sqr, range(1000)))
1064
1065 def test_make_pool(self):
1066 p = multiprocessing.Pool(3)
1067 self.assertEqual(3, len(p._pool))
1068 p.close()
1069 p.join()
1070
1071 def test_terminate(self):
1072 if self.TYPE == 'manager':
1073 # On Unix a forked process increfs each shared object to
1074 # which its parent process held a reference. If the
1075 # forked process gets terminated then there is likely to
1076 # be a reference leak. So to prevent
1077 # _TestZZZNumberOfObjects from failing we skip this test
1078 # when using a manager.
1079 return
1080
1081 result = self.pool.map_async(
1082 time.sleep, [0.1 for i in range(10000)], chunksize=1
1083 )
1084 self.pool.terminate()
1085 join = TimingWrapper(self.pool.join)
1086 join()
1087 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001088
1089class _TestPoolWorkerLifetime(BaseTestCase):
1090
1091 ALLOWED_TYPES = ('processes', )
1092 def test_pool_worker_lifetime(self):
1093 p = multiprocessing.Pool(3, maxtasksperchild=10)
1094 self.assertEqual(3, len(p._pool))
1095 origworkerpids = [w.pid for w in p._pool]
1096 # Run many tasks so each worker gets replaced (hopefully)
1097 results = []
1098 for i in range(100):
1099 results.append(p.apply_async(sqr, (i, )))
1100 # Fetch the results and verify we got the right answers,
1101 # also ensuring all the tasks have completed.
1102 for (j, res) in enumerate(results):
1103 self.assertEqual(res.get(), sqr(j))
1104 # Refill the pool
1105 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001106 # Wait until all workers are alive
1107 countdown = 5
1108 while countdown and not all(w.is_alive() for w in p._pool):
1109 countdown -= 1
1110 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001111 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001112 # All pids should be assigned. See issue #7805.
1113 self.assertNotIn(None, origworkerpids)
1114 self.assertNotIn(None, finalworkerpids)
1115 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001116 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1117 p.close()
1118 p.join()
1119
Benjamin Petersondfd79492008-06-13 19:13:39 +00001120#
1121# Test that manager has expected number of shared objects left
1122#
1123
1124class _TestZZZNumberOfObjects(BaseTestCase):
1125 # Because test cases are sorted alphabetically, this one will get
1126 # run after all the other tests for the manager. It tests that
1127 # there have been no "reference leaks" for the manager's shared
1128 # objects. Note the comment in _TestPool.test_terminate().
1129 ALLOWED_TYPES = ('manager',)
1130
1131 def test_number_of_objects(self):
1132 EXPECTED_NUMBER = 1 # the pool object is still alive
1133 multiprocessing.active_children() # discard dead process objs
1134 gc.collect() # do garbage collection
1135 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001136 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001137 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001138 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001139 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001140
1141 self.assertEqual(refs, EXPECTED_NUMBER)
1142
1143#
1144# Test of creating a customized manager class
1145#
1146
1147from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1148
1149class FooBar(object):
1150 def f(self):
1151 return 'f()'
1152 def g(self):
1153 raise ValueError
1154 def _h(self):
1155 return '_h()'
1156
1157def baz():
1158 for i in xrange(10):
1159 yield i*i
1160
1161class IteratorProxy(BaseProxy):
1162 _exposed_ = ('next', '__next__')
1163 def __iter__(self):
1164 return self
1165 def next(self):
1166 return self._callmethod('next')
1167 def __next__(self):
1168 return self._callmethod('__next__')
1169
1170class MyManager(BaseManager):
1171 pass
1172
1173MyManager.register('Foo', callable=FooBar)
1174MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1175MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1176
1177
1178class _TestMyManager(BaseTestCase):
1179
1180 ALLOWED_TYPES = ('manager',)
1181
1182 def test_mymanager(self):
1183 manager = MyManager()
1184 manager.start()
1185
1186 foo = manager.Foo()
1187 bar = manager.Bar()
1188 baz = manager.baz()
1189
1190 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1191 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1192
1193 self.assertEqual(foo_methods, ['f', 'g'])
1194 self.assertEqual(bar_methods, ['f', '_h'])
1195
1196 self.assertEqual(foo.f(), 'f()')
1197 self.assertRaises(ValueError, foo.g)
1198 self.assertEqual(foo._callmethod('f'), 'f()')
1199 self.assertRaises(RemoteError, foo._callmethod, '_h')
1200
1201 self.assertEqual(bar.f(), 'f()')
1202 self.assertEqual(bar._h(), '_h()')
1203 self.assertEqual(bar._callmethod('f'), 'f()')
1204 self.assertEqual(bar._callmethod('_h'), '_h()')
1205
1206 self.assertEqual(list(baz), [i*i for i in range(10)])
1207
1208 manager.shutdown()
1209
1210#
1211# Test of connecting to a remote server and using xmlrpclib for serialization
1212#
1213
1214_queue = Queue.Queue()
1215def get_queue():
1216 return _queue
1217
1218class QueueManager(BaseManager):
1219 '''manager class used by server process'''
1220QueueManager.register('get_queue', callable=get_queue)
1221
1222class QueueManager2(BaseManager):
1223 '''manager class which specifies the same interface as QueueManager'''
1224QueueManager2.register('get_queue')
1225
1226
1227SERIALIZER = 'xmlrpclib'
1228
1229class _TestRemoteManager(BaseTestCase):
1230
1231 ALLOWED_TYPES = ('manager',)
1232
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001233 @classmethod
1234 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001235 manager = QueueManager2(
1236 address=address, authkey=authkey, serializer=SERIALIZER
1237 )
1238 manager.connect()
1239 queue = manager.get_queue()
1240 queue.put(('hello world', None, True, 2.25))
1241
1242 def test_remote(self):
1243 authkey = os.urandom(32)
1244
1245 manager = QueueManager(
1246 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1247 )
1248 manager.start()
1249
1250 p = self.Process(target=self._putter, args=(manager.address, authkey))
1251 p.start()
1252
1253 manager2 = QueueManager2(
1254 address=manager.address, authkey=authkey, serializer=SERIALIZER
1255 )
1256 manager2.connect()
1257 queue = manager2.get_queue()
1258
1259 # Note that xmlrpclib will deserialize object as a list not a tuple
1260 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1261
1262 # Because we are using xmlrpclib for serialization instead of
1263 # pickle this will cause a serialization error.
1264 self.assertRaises(Exception, queue.put, time.sleep)
1265
1266 # Make queue finalizer run before the server is stopped
1267 del queue
1268 manager.shutdown()
1269
Jesse Noller459a6482009-03-30 15:50:42 +00001270class _TestManagerRestart(BaseTestCase):
1271
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001272 @classmethod
1273 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001274 manager = QueueManager(
1275 address=address, authkey=authkey, serializer=SERIALIZER)
1276 manager.connect()
1277 queue = manager.get_queue()
1278 queue.put('hello world')
1279
1280 def test_rapid_restart(self):
1281 authkey = os.urandom(32)
1282 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001283 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001284 srvr = manager.get_server()
1285 addr = srvr.address
1286 # Close the connection.Listener socket which gets opened as a part
1287 # of manager.get_server(). It's not needed for the test.
1288 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001289 manager.start()
1290
1291 p = self.Process(target=self._putter, args=(manager.address, authkey))
1292 p.start()
1293 queue = manager.get_queue()
1294 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001295 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001296 manager.shutdown()
1297 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001298 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001299 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001300 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001301
Benjamin Petersondfd79492008-06-13 19:13:39 +00001302#
1303#
1304#
1305
1306SENTINEL = latin('')
1307
1308class _TestConnection(BaseTestCase):
1309
1310 ALLOWED_TYPES = ('processes', 'threads')
1311
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001312 @classmethod
1313 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001314 for msg in iter(conn.recv_bytes, SENTINEL):
1315 conn.send_bytes(msg)
1316 conn.close()
1317
1318 def test_connection(self):
1319 conn, child_conn = self.Pipe()
1320
1321 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001322 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001323 p.start()
1324
1325 seq = [1, 2.25, None]
1326 msg = latin('hello world')
1327 longmsg = msg * 10
1328 arr = array.array('i', range(4))
1329
1330 if self.TYPE == 'processes':
1331 self.assertEqual(type(conn.fileno()), int)
1332
1333 self.assertEqual(conn.send(seq), None)
1334 self.assertEqual(conn.recv(), seq)
1335
1336 self.assertEqual(conn.send_bytes(msg), None)
1337 self.assertEqual(conn.recv_bytes(), msg)
1338
1339 if self.TYPE == 'processes':
1340 buffer = array.array('i', [0]*10)
1341 expected = list(arr) + [0] * (10 - len(arr))
1342 self.assertEqual(conn.send_bytes(arr), None)
1343 self.assertEqual(conn.recv_bytes_into(buffer),
1344 len(arr) * buffer.itemsize)
1345 self.assertEqual(list(buffer), expected)
1346
1347 buffer = array.array('i', [0]*10)
1348 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1349 self.assertEqual(conn.send_bytes(arr), None)
1350 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1351 len(arr) * buffer.itemsize)
1352 self.assertEqual(list(buffer), expected)
1353
1354 buffer = bytearray(latin(' ' * 40))
1355 self.assertEqual(conn.send_bytes(longmsg), None)
1356 try:
1357 res = conn.recv_bytes_into(buffer)
1358 except multiprocessing.BufferTooShort, e:
1359 self.assertEqual(e.args, (longmsg,))
1360 else:
1361 self.fail('expected BufferTooShort, got %s' % res)
1362
1363 poll = TimingWrapper(conn.poll)
1364
1365 self.assertEqual(poll(), False)
1366 self.assertTimingAlmostEqual(poll.elapsed, 0)
1367
1368 self.assertEqual(poll(TIMEOUT1), False)
1369 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1370
1371 conn.send(None)
1372
1373 self.assertEqual(poll(TIMEOUT1), True)
1374 self.assertTimingAlmostEqual(poll.elapsed, 0)
1375
1376 self.assertEqual(conn.recv(), None)
1377
1378 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1379 conn.send_bytes(really_big_msg)
1380 self.assertEqual(conn.recv_bytes(), really_big_msg)
1381
1382 conn.send_bytes(SENTINEL) # tell child to quit
1383 child_conn.close()
1384
1385 if self.TYPE == 'processes':
1386 self.assertEqual(conn.readable, True)
1387 self.assertEqual(conn.writable, True)
1388 self.assertRaises(EOFError, conn.recv)
1389 self.assertRaises(EOFError, conn.recv_bytes)
1390
1391 p.join()
1392
1393 def test_duplex_false(self):
1394 reader, writer = self.Pipe(duplex=False)
1395 self.assertEqual(writer.send(1), None)
1396 self.assertEqual(reader.recv(), 1)
1397 if self.TYPE == 'processes':
1398 self.assertEqual(reader.readable, True)
1399 self.assertEqual(reader.writable, False)
1400 self.assertEqual(writer.readable, False)
1401 self.assertEqual(writer.writable, True)
1402 self.assertRaises(IOError, reader.send, 2)
1403 self.assertRaises(IOError, writer.recv)
1404 self.assertRaises(IOError, writer.poll)
1405
1406 def test_spawn_close(self):
1407 # We test that a pipe connection can be closed by parent
1408 # process immediately after child is spawned. On Windows this
1409 # would have sometimes failed on old versions because
1410 # child_conn would be closed before the child got a chance to
1411 # duplicate it.
1412 conn, child_conn = self.Pipe()
1413
1414 p = self.Process(target=self._echo, args=(child_conn,))
1415 p.start()
1416 child_conn.close() # this might complete before child initializes
1417
1418 msg = latin('hello')
1419 conn.send_bytes(msg)
1420 self.assertEqual(conn.recv_bytes(), msg)
1421
1422 conn.send_bytes(SENTINEL)
1423 conn.close()
1424 p.join()
1425
1426 def test_sendbytes(self):
1427 if self.TYPE != 'processes':
1428 return
1429
1430 msg = latin('abcdefghijklmnopqrstuvwxyz')
1431 a, b = self.Pipe()
1432
1433 a.send_bytes(msg)
1434 self.assertEqual(b.recv_bytes(), msg)
1435
1436 a.send_bytes(msg, 5)
1437 self.assertEqual(b.recv_bytes(), msg[5:])
1438
1439 a.send_bytes(msg, 7, 8)
1440 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1441
1442 a.send_bytes(msg, 26)
1443 self.assertEqual(b.recv_bytes(), latin(''))
1444
1445 a.send_bytes(msg, 26, 0)
1446 self.assertEqual(b.recv_bytes(), latin(''))
1447
1448 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1449
1450 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1451
1452 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1453
1454 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1455
1456 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1457
Benjamin Petersondfd79492008-06-13 19:13:39 +00001458class _TestListenerClient(BaseTestCase):
1459
1460 ALLOWED_TYPES = ('processes', 'threads')
1461
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001462 @classmethod
1463 def _test(cls, address):
1464 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001465 conn.send('hello')
1466 conn.close()
1467
1468 def test_listener_client(self):
1469 for family in self.connection.families:
1470 l = self.connection.Listener(family=family)
1471 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001472 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001473 p.start()
1474 conn = l.accept()
1475 self.assertEqual(conn.recv(), 'hello')
1476 p.join()
1477 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001478#
1479# Test of sending connection and socket objects between processes
1480#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001481"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001482class _TestPicklingConnections(BaseTestCase):
1483
1484 ALLOWED_TYPES = ('processes',)
1485
1486 def _listener(self, conn, families):
1487 for fam in families:
1488 l = self.connection.Listener(family=fam)
1489 conn.send(l.address)
1490 new_conn = l.accept()
1491 conn.send(new_conn)
1492
1493 if self.TYPE == 'processes':
1494 l = socket.socket()
1495 l.bind(('localhost', 0))
1496 conn.send(l.getsockname())
1497 l.listen(1)
1498 new_conn, addr = l.accept()
1499 conn.send(new_conn)
1500
1501 conn.recv()
1502
1503 def _remote(self, conn):
1504 for (address, msg) in iter(conn.recv, None):
1505 client = self.connection.Client(address)
1506 client.send(msg.upper())
1507 client.close()
1508
1509 if self.TYPE == 'processes':
1510 address, msg = conn.recv()
1511 client = socket.socket()
1512 client.connect(address)
1513 client.sendall(msg.upper())
1514 client.close()
1515
1516 conn.close()
1517
1518 def test_pickling(self):
1519 try:
1520 multiprocessing.allow_connection_pickling()
1521 except ImportError:
1522 return
1523
1524 families = self.connection.families
1525
1526 lconn, lconn0 = self.Pipe()
1527 lp = self.Process(target=self._listener, args=(lconn0, families))
1528 lp.start()
1529 lconn0.close()
1530
1531 rconn, rconn0 = self.Pipe()
1532 rp = self.Process(target=self._remote, args=(rconn0,))
1533 rp.start()
1534 rconn0.close()
1535
1536 for fam in families:
1537 msg = ('This connection uses family %s' % fam).encode('ascii')
1538 address = lconn.recv()
1539 rconn.send((address, msg))
1540 new_conn = lconn.recv()
1541 self.assertEqual(new_conn.recv(), msg.upper())
1542
1543 rconn.send(None)
1544
1545 if self.TYPE == 'processes':
1546 msg = latin('This connection uses a normal socket')
1547 address = lconn.recv()
1548 rconn.send((address, msg))
1549 if hasattr(socket, 'fromfd'):
1550 new_conn = lconn.recv()
1551 self.assertEqual(new_conn.recv(100), msg.upper())
1552 else:
1553 # XXX On Windows with Py2.6 need to backport fromfd()
1554 discard = lconn.recv_bytes()
1555
1556 lconn.send(None)
1557
1558 rconn.close()
1559 lconn.close()
1560
1561 lp.join()
1562 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001563"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001564#
1565#
1566#
1567
1568class _TestHeap(BaseTestCase):
1569
1570 ALLOWED_TYPES = ('processes',)
1571
1572 def test_heap(self):
1573 iterations = 5000
1574 maxblocks = 50
1575 blocks = []
1576
1577 # create and destroy lots of blocks of different sizes
1578 for i in xrange(iterations):
1579 size = int(random.lognormvariate(0, 1) * 1000)
1580 b = multiprocessing.heap.BufferWrapper(size)
1581 blocks.append(b)
1582 if len(blocks) > maxblocks:
1583 i = random.randrange(maxblocks)
1584 del blocks[i]
1585
1586 # get the heap object
1587 heap = multiprocessing.heap.BufferWrapper._heap
1588
1589 # verify the state of the heap
1590 all = []
1591 occupied = 0
1592 for L in heap._len_to_seq.values():
1593 for arena, start, stop in L:
1594 all.append((heap._arenas.index(arena), start, stop,
1595 stop-start, 'free'))
1596 for arena, start, stop in heap._allocated_blocks:
1597 all.append((heap._arenas.index(arena), start, stop,
1598 stop-start, 'occupied'))
1599 occupied += (stop-start)
1600
1601 all.sort()
1602
1603 for i in range(len(all)-1):
1604 (arena, start, stop) = all[i][:3]
1605 (narena, nstart, nstop) = all[i+1][:3]
1606 self.assertTrue((arena != narena and nstart == 0) or
1607 (stop == nstart))
1608
1609#
1610#
1611#
1612
Benjamin Petersondfd79492008-06-13 19:13:39 +00001613class _Foo(Structure):
1614 _fields_ = [
1615 ('x', c_int),
1616 ('y', c_double)
1617 ]
1618
1619class _TestSharedCTypes(BaseTestCase):
1620
1621 ALLOWED_TYPES = ('processes',)
1622
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001623 def setUp(self):
1624 if not HAS_SHAREDCTYPES:
1625 self.skipTest("requires multiprocessing.sharedctypes")
1626
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001627 @classmethod
1628 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001629 x.value *= 2
1630 y.value *= 2
1631 foo.x *= 2
1632 foo.y *= 2
1633 string.value *= 2
1634 for i in range(len(arr)):
1635 arr[i] *= 2
1636
1637 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001638 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001639 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001640 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001641 arr = self.Array('d', range(10), lock=lock)
1642 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001643 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001644
1645 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1646 p.start()
1647 p.join()
1648
1649 self.assertEqual(x.value, 14)
1650 self.assertAlmostEqual(y.value, 2.0/3.0)
1651 self.assertEqual(foo.x, 6)
1652 self.assertAlmostEqual(foo.y, 4.0)
1653 for i in range(10):
1654 self.assertAlmostEqual(arr[i], i*2)
1655 self.assertEqual(string.value, latin('hellohello'))
1656
1657 def test_synchronize(self):
1658 self.test_sharedctypes(lock=True)
1659
1660 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001661 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001662 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001663 foo.x = 0
1664 foo.y = 0
1665 self.assertEqual(bar.x, 2)
1666 self.assertAlmostEqual(bar.y, 5.0)
1667
1668#
1669#
1670#
1671
1672class _TestFinalize(BaseTestCase):
1673
1674 ALLOWED_TYPES = ('processes',)
1675
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001676 @classmethod
1677 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001678 class Foo(object):
1679 pass
1680
1681 a = Foo()
1682 util.Finalize(a, conn.send, args=('a',))
1683 del a # triggers callback for a
1684
1685 b = Foo()
1686 close_b = util.Finalize(b, conn.send, args=('b',))
1687 close_b() # triggers callback for b
1688 close_b() # does nothing because callback has already been called
1689 del b # does nothing because callback has already been called
1690
1691 c = Foo()
1692 util.Finalize(c, conn.send, args=('c',))
1693
1694 d10 = Foo()
1695 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1696
1697 d01 = Foo()
1698 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1699 d02 = Foo()
1700 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1701 d03 = Foo()
1702 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1703
1704 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1705
1706 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1707
1708 # call mutliprocessing's cleanup function then exit process without
1709 # garbage collecting locals
1710 util._exit_function()
1711 conn.close()
1712 os._exit(0)
1713
1714 def test_finalize(self):
1715 conn, child_conn = self.Pipe()
1716
1717 p = self.Process(target=self._test_finalize, args=(child_conn,))
1718 p.start()
1719 p.join()
1720
1721 result = [obj for obj in iter(conn.recv, 'STOP')]
1722 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1723
1724#
1725# Test that from ... import * works for each module
1726#
1727
1728class _TestImportStar(BaseTestCase):
1729
1730 ALLOWED_TYPES = ('processes',)
1731
1732 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001733 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001734 'multiprocessing', 'multiprocessing.connection',
1735 'multiprocessing.heap', 'multiprocessing.managers',
1736 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001737 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001738 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001739 ]
1740
1741 if c_int is not None:
1742 # This module requires _ctypes
1743 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001744
1745 for name in modules:
1746 __import__(name)
1747 mod = sys.modules[name]
1748
1749 for attr in getattr(mod, '__all__', ()):
1750 self.assertTrue(
1751 hasattr(mod, attr),
1752 '%r does not have attribute %r' % (mod, attr)
1753 )
1754
1755#
1756# Quick test that logging works -- does not test logging output
1757#
1758
1759class _TestLogging(BaseTestCase):
1760
1761 ALLOWED_TYPES = ('processes',)
1762
1763 def test_enable_logging(self):
1764 logger = multiprocessing.get_logger()
1765 logger.setLevel(util.SUBWARNING)
1766 self.assertTrue(logger is not None)
1767 logger.debug('this will not be printed')
1768 logger.info('nor will this')
1769 logger.setLevel(LOG_LEVEL)
1770
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001771 @classmethod
1772 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001773 logger = multiprocessing.get_logger()
1774 conn.send(logger.getEffectiveLevel())
1775
1776 def test_level(self):
1777 LEVEL1 = 32
1778 LEVEL2 = 37
1779
1780 logger = multiprocessing.get_logger()
1781 root_logger = logging.getLogger()
1782 root_level = root_logger.level
1783
1784 reader, writer = multiprocessing.Pipe(duplex=False)
1785
1786 logger.setLevel(LEVEL1)
1787 self.Process(target=self._test_level, args=(writer,)).start()
1788 self.assertEqual(LEVEL1, reader.recv())
1789
1790 logger.setLevel(logging.NOTSET)
1791 root_logger.setLevel(LEVEL2)
1792 self.Process(target=self._test_level, args=(writer,)).start()
1793 self.assertEqual(LEVEL2, reader.recv())
1794
1795 root_logger.setLevel(root_level)
1796 logger.setLevel(level=LOG_LEVEL)
1797
Jesse Noller814d02d2009-11-21 14:38:23 +00001798
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001799# class _TestLoggingProcessName(BaseTestCase):
1800#
1801# def handle(self, record):
1802# assert record.processName == multiprocessing.current_process().name
1803# self.__handled = True
1804#
1805# def test_logging(self):
1806# handler = logging.Handler()
1807# handler.handle = self.handle
1808# self.__handled = False
1809# # Bypass getLogger() and side-effects
1810# logger = logging.getLoggerClass()(
1811# 'multiprocessing.test.TestLoggingProcessName')
1812# logger.addHandler(handler)
1813# logger.propagate = False
1814#
1815# logger.warn('foo')
1816# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001817
Benjamin Petersondfd79492008-06-13 19:13:39 +00001818#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001819# Test to verify handle verification, see issue 3321
1820#
1821
1822class TestInvalidHandle(unittest.TestCase):
1823
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001824 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001825 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001826 conn = _multiprocessing.Connection(44977608)
1827 self.assertRaises(IOError, conn.poll)
1828 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001829
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001830#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001831# Functions used to create test cases from the base ones in this module
1832#
1833
1834def get_attributes(Source, names):
1835 d = {}
1836 for name in names:
1837 obj = getattr(Source, name)
1838 if type(obj) == type(get_attributes):
1839 obj = staticmethod(obj)
1840 d[name] = obj
1841 return d
1842
1843def create_test_cases(Mixin, type):
1844 result = {}
1845 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001846 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001847
1848 for name in glob.keys():
1849 if name.startswith('_Test'):
1850 base = glob[name]
1851 if type in base.ALLOWED_TYPES:
1852 newname = 'With' + Type + name[1:]
1853 class Temp(base, unittest.TestCase, Mixin):
1854 pass
1855 result[newname] = Temp
1856 Temp.__name__ = newname
1857 Temp.__module__ = Mixin.__module__
1858 return result
1859
1860#
1861# Create test cases
1862#
1863
1864class ProcessesMixin(object):
1865 TYPE = 'processes'
1866 Process = multiprocessing.Process
1867 locals().update(get_attributes(multiprocessing, (
1868 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1869 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1870 'RawArray', 'current_process', 'active_children', 'Pipe',
1871 'connection', 'JoinableQueue'
1872 )))
1873
1874testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1875globals().update(testcases_processes)
1876
1877
1878class ManagerMixin(object):
1879 TYPE = 'manager'
1880 Process = multiprocessing.Process
1881 manager = object.__new__(multiprocessing.managers.SyncManager)
1882 locals().update(get_attributes(manager, (
1883 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1884 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1885 'Namespace', 'JoinableQueue'
1886 )))
1887
1888testcases_manager = create_test_cases(ManagerMixin, type='manager')
1889globals().update(testcases_manager)
1890
1891
1892class ThreadsMixin(object):
1893 TYPE = 'threads'
1894 Process = multiprocessing.dummy.Process
1895 locals().update(get_attributes(multiprocessing.dummy, (
1896 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1897 'Condition', 'Event', 'Value', 'Array', 'current_process',
1898 'active_children', 'Pipe', 'connection', 'dict', 'list',
1899 'Namespace', 'JoinableQueue'
1900 )))
1901
1902testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1903globals().update(testcases_threads)
1904
Neal Norwitz0c519b32008-08-25 01:50:24 +00001905class OtherTest(unittest.TestCase):
1906 # TODO: add more tests for deliver/answer challenge.
1907 def test_deliver_challenge_auth_failure(self):
1908 class _FakeConnection(object):
1909 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001910 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001911 def send_bytes(self, data):
1912 pass
1913 self.assertRaises(multiprocessing.AuthenticationError,
1914 multiprocessing.connection.deliver_challenge,
1915 _FakeConnection(), b'abc')
1916
1917 def test_answer_challenge_auth_failure(self):
1918 class _FakeConnection(object):
1919 def __init__(self):
1920 self.count = 0
1921 def recv_bytes(self, size):
1922 self.count += 1
1923 if self.count == 1:
1924 return multiprocessing.connection.CHALLENGE
1925 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001926 return b'something bogus'
1927 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001928 def send_bytes(self, data):
1929 pass
1930 self.assertRaises(multiprocessing.AuthenticationError,
1931 multiprocessing.connection.answer_challenge,
1932 _FakeConnection(), b'abc')
1933
Jesse Noller7152f6d2009-04-02 05:17:26 +00001934#
1935# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1936#
1937
1938def initializer(ns):
1939 ns.test += 1
1940
1941class TestInitializers(unittest.TestCase):
1942 def setUp(self):
1943 self.mgr = multiprocessing.Manager()
1944 self.ns = self.mgr.Namespace()
1945 self.ns.test = 0
1946
1947 def tearDown(self):
1948 self.mgr.shutdown()
1949
1950 def test_manager_initializer(self):
1951 m = multiprocessing.managers.SyncManager()
1952 self.assertRaises(TypeError, m.start, 1)
1953 m.start(initializer, (self.ns,))
1954 self.assertEqual(self.ns.test, 1)
1955 m.shutdown()
1956
1957 def test_pool_initializer(self):
1958 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1959 p = multiprocessing.Pool(1, initializer, (self.ns,))
1960 p.close()
1961 p.join()
1962 self.assertEqual(self.ns.test, 1)
1963
Jesse Noller1b90efb2009-06-30 17:11:52 +00001964#
1965# Issue 5155, 5313, 5331: Test process in processes
1966# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1967#
1968
1969def _ThisSubProcess(q):
1970 try:
1971 item = q.get(block=False)
1972 except Queue.Empty:
1973 pass
1974
1975def _TestProcess(q):
1976 queue = multiprocessing.Queue()
1977 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1978 subProc.start()
1979 subProc.join()
1980
1981def _afunc(x):
1982 return x*x
1983
1984def pool_in_process():
1985 pool = multiprocessing.Pool(processes=4)
1986 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1987
1988class _file_like(object):
1989 def __init__(self, delegate):
1990 self._delegate = delegate
1991 self._pid = None
1992
1993 @property
1994 def cache(self):
1995 pid = os.getpid()
1996 # There are no race conditions since fork keeps only the running thread
1997 if pid != self._pid:
1998 self._pid = pid
1999 self._cache = []
2000 return self._cache
2001
2002 def write(self, data):
2003 self.cache.append(data)
2004
2005 def flush(self):
2006 self._delegate.write(''.join(self.cache))
2007 self._cache = []
2008
2009class TestStdinBadfiledescriptor(unittest.TestCase):
2010
2011 def test_queue_in_process(self):
2012 queue = multiprocessing.Queue()
2013 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2014 proc.start()
2015 proc.join()
2016
2017 def test_pool_in_process(self):
2018 p = multiprocessing.Process(target=pool_in_process)
2019 p.start()
2020 p.join()
2021
2022 def test_flushing(self):
2023 sio = StringIO()
2024 flike = _file_like(sio)
2025 flike.write('foo')
2026 proc = multiprocessing.Process(target=lambda: flike.flush())
2027 flike.flush()
2028 assert sio.getvalue() == 'foo'
2029
2030testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2031 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002032
Benjamin Petersondfd79492008-06-13 19:13:39 +00002033#
2034#
2035#
2036
2037def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002038 if sys.platform.startswith("linux"):
2039 try:
2040 lock = multiprocessing.RLock()
2041 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002042 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002043
Benjamin Petersondfd79492008-06-13 19:13:39 +00002044 if run is None:
2045 from test.test_support import run_unittest as run
2046
2047 util.get_temp_dir() # creates temp directory for use by all processes
2048
2049 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2050
Jesse Noller146b7ab2008-07-02 16:44:09 +00002051 ProcessesMixin.pool = multiprocessing.Pool(4)
2052 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2053 ManagerMixin.manager.__init__()
2054 ManagerMixin.manager.start()
2055 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002056
2057 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002058 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2059 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002060 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2061 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002062 )
2063
2064 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2065 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002066 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2067 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002068 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002069 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002070 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002071 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2072 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2073 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002074 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002075
Jesse Noller146b7ab2008-07-02 16:44:09 +00002076 ThreadsMixin.pool.terminate()
2077 ProcessesMixin.pool.terminate()
2078 ManagerMixin.pool.terminate()
2079 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002080
Jesse Noller146b7ab2008-07-02 16:44:09 +00002081 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002082
2083def main():
2084 test_main(unittest.TextTestRunner(verbosity=2).run)
2085
2086if __name__ == '__main__':
2087 main()