blob: 6521c030ca033c1e3381b92e1c65c4ac845e3695 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Victor Stinner613b4cf2010-04-27 21:56:26 +000021# import threading after _multiprocessing to raise a more revelant error
22# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
Brian Curtina06e9b82010-10-07 02:27:41 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44#
45#
46
Benjamin Petersone79edf52008-07-13 18:34:58 +000047latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000054#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000055
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersondfd79492008-06-13 19:13:39 +000071#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000072# Some tests require ctypes
73#
74
75try:
Nick Coghlan13623662010-04-10 14:24:36 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersondfd79492008-06-13 19:13:39 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersondfd79492008-06-13 19:13:39 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155
156 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000158 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 q.put(args)
167 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000169 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000182 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000186 self.assertEqual(p.authkey, current.authkey)
187 self.assertEqual(p.is_alive(), False)
188 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000189 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192
193 p.start()
194
Ezio Melotti2623a372010-11-21 13:34:58 +0000195 self.assertEqual(p.exitcode, None)
196 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000197 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198
Ezio Melotti2623a372010-11-21 13:34:58 +0000199 self.assertEqual(q.get(), args[1:])
200 self.assertEqual(q.get(), kwargs)
201 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000203 self.assertEqual(q.get(), current.authkey)
204 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205
206 p.join()
207
Ezio Melotti2623a372010-11-21 13:34:58 +0000208 self.assertEqual(p.exitcode, 0)
209 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000211
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000225 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000235 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 p.join()
238
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255
256 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000260 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000369 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(Queue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(Queue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(Queue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000420 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000436 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(Queue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(Queue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(Queue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(Queue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in xrange(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in xrange(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Noller82eb5902009-03-30 23:29:31 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000666 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000670 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000712 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000717 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in xrange(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in xrange(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000735 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000739 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in xrange(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
783 # Removed temporaily, due to API shear, this does not
784 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000809 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000810
811#
812#
813#
814
Brian Curtina06e9b82010-10-07 02:27:41 +0000815@unittest.skipUnless(HAS_SHAREDCTYPES,
816 "requires multiprocessing.sharedctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000817class _TestValue(BaseTestCase):
818
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000819 ALLOWED_TYPES = ('processes',)
820
Benjamin Petersondfd79492008-06-13 19:13:39 +0000821 codes_values = [
822 ('i', 4343, 24234),
823 ('d', 3.625, -4.25),
824 ('h', -232, 234),
825 ('c', latin('x'), latin('y'))
826 ]
827
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000828 @classmethod
829 def _test(cls, values):
830 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000831 sv.value = cv[2]
832
833
834 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000835 if raw:
836 values = [self.RawValue(code, value)
837 for code, value, _ in self.codes_values]
838 else:
839 values = [self.Value(code, value)
840 for code, value, _ in self.codes_values]
841
842 for sv, cv in zip(values, self.codes_values):
843 self.assertEqual(sv.value, cv[1])
844
845 proc = self.Process(target=self._test, args=(values,))
846 proc.start()
847 proc.join()
848
849 for sv, cv in zip(values, self.codes_values):
850 self.assertEqual(sv.value, cv[2])
851
852 def test_rawvalue(self):
853 self.test_value(raw=True)
854
855 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000856 val1 = self.Value('i', 5)
857 lock1 = val1.get_lock()
858 obj1 = val1.get_obj()
859
860 val2 = self.Value('i', 5, lock=None)
861 lock2 = val2.get_lock()
862 obj2 = val2.get_obj()
863
864 lock = self.Lock()
865 val3 = self.Value('i', 5, lock=lock)
866 lock3 = val3.get_lock()
867 obj3 = val3.get_obj()
868 self.assertEqual(lock, lock3)
869
Jesse Noller6ab22152009-01-18 02:45:38 +0000870 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000871 self.assertFalse(hasattr(arr4, 'get_lock'))
872 self.assertFalse(hasattr(arr4, 'get_obj'))
873
Jesse Noller6ab22152009-01-18 02:45:38 +0000874 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
875
876 arr5 = self.RawValue('i', 5)
877 self.assertFalse(hasattr(arr5, 'get_lock'))
878 self.assertFalse(hasattr(arr5, 'get_obj'))
879
Benjamin Petersondfd79492008-06-13 19:13:39 +0000880
881class _TestArray(BaseTestCase):
882
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000883 ALLOWED_TYPES = ('processes',)
884
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000885 @classmethod
886 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000887 for i in range(1, len(seq)):
888 seq[i] += seq[i-1]
889
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000890 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000891 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000892 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
893 if raw:
894 arr = self.RawArray('i', seq)
895 else:
896 arr = self.Array('i', seq)
897
898 self.assertEqual(len(arr), len(seq))
899 self.assertEqual(arr[3], seq[3])
900 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
901
902 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
903
904 self.assertEqual(list(arr[:]), seq)
905
906 self.f(seq)
907
908 p = self.Process(target=self.f, args=(arr,))
909 p.start()
910 p.join()
911
912 self.assertEqual(list(arr[:]), seq)
913
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000914 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000915 def test_rawarray(self):
916 self.test_array(raw=True)
917
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000918 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000919 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000920 arr1 = self.Array('i', range(10))
921 lock1 = arr1.get_lock()
922 obj1 = arr1.get_obj()
923
924 arr2 = self.Array('i', range(10), lock=None)
925 lock2 = arr2.get_lock()
926 obj2 = arr2.get_obj()
927
928 lock = self.Lock()
929 arr3 = self.Array('i', range(10), lock=lock)
930 lock3 = arr3.get_lock()
931 obj3 = arr3.get_obj()
932 self.assertEqual(lock, lock3)
933
Jesse Noller6ab22152009-01-18 02:45:38 +0000934 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000935 self.assertFalse(hasattr(arr4, 'get_lock'))
936 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000937 self.assertRaises(AttributeError,
938 self.Array, 'i', range(10), lock='notalock')
939
940 arr5 = self.RawArray('i', range(10))
941 self.assertFalse(hasattr(arr5, 'get_lock'))
942 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000943
944#
945#
946#
947
948class _TestContainers(BaseTestCase):
949
950 ALLOWED_TYPES = ('manager',)
951
952 def test_list(self):
953 a = self.list(range(10))
954 self.assertEqual(a[:], range(10))
955
956 b = self.list()
957 self.assertEqual(b[:], [])
958
959 b.extend(range(5))
960 self.assertEqual(b[:], range(5))
961
962 self.assertEqual(b[2], 2)
963 self.assertEqual(b[2:10], [2,3,4])
964
965 b *= 2
966 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
967
968 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
969
970 self.assertEqual(a[:], range(10))
971
972 d = [a, b]
973 e = self.list(d)
974 self.assertEqual(
975 e[:],
976 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
977 )
978
979 f = self.list([a])
980 a.append('hello')
981 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
982
983 def test_dict(self):
984 d = self.dict()
985 indices = range(65, 70)
986 for i in indices:
987 d[i] = chr(i)
988 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
989 self.assertEqual(sorted(d.keys()), indices)
990 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
991 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
992
993 def test_namespace(self):
994 n = self.Namespace()
995 n.name = 'Bob'
996 n.job = 'Builder'
997 n._hidden = 'hidden'
998 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
999 del n.job
1000 self.assertEqual(str(n), "Namespace(name='Bob')")
1001 self.assertTrue(hasattr(n, 'name'))
1002 self.assertTrue(not hasattr(n, 'job'))
1003
1004#
1005#
1006#
1007
1008def sqr(x, wait=0.0):
1009 time.sleep(wait)
1010 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001011class _TestPool(BaseTestCase):
1012
1013 def test_apply(self):
1014 papply = self.pool.apply
1015 self.assertEqual(papply(sqr, (5,)), sqr(5))
1016 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1017
1018 def test_map(self):
1019 pmap = self.pool.map
1020 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1021 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1022 map(sqr, range(100)))
1023
Jesse Noller7530e472009-07-16 14:23:04 +00001024 def test_map_chunksize(self):
1025 try:
1026 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1027 except multiprocessing.TimeoutError:
1028 self.fail("pool.map_async with chunksize stalled on null list")
1029
Benjamin Petersondfd79492008-06-13 19:13:39 +00001030 def test_async(self):
1031 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1032 get = TimingWrapper(res.get)
1033 self.assertEqual(get(), 49)
1034 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1035
1036 def test_async_timeout(self):
1037 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1038 get = TimingWrapper(res.get)
1039 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1040 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1041
1042 def test_imap(self):
1043 it = self.pool.imap(sqr, range(10))
1044 self.assertEqual(list(it), map(sqr, range(10)))
1045
1046 it = self.pool.imap(sqr, range(10))
1047 for i in range(10):
1048 self.assertEqual(it.next(), i*i)
1049 self.assertRaises(StopIteration, it.next)
1050
1051 it = self.pool.imap(sqr, range(1000), chunksize=100)
1052 for i in range(1000):
1053 self.assertEqual(it.next(), i*i)
1054 self.assertRaises(StopIteration, it.next)
1055
1056 def test_imap_unordered(self):
1057 it = self.pool.imap_unordered(sqr, range(1000))
1058 self.assertEqual(sorted(it), map(sqr, range(1000)))
1059
1060 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1061 self.assertEqual(sorted(it), map(sqr, range(1000)))
1062
1063 def test_make_pool(self):
1064 p = multiprocessing.Pool(3)
1065 self.assertEqual(3, len(p._pool))
1066 p.close()
1067 p.join()
1068
1069 def test_terminate(self):
1070 if self.TYPE == 'manager':
1071 # On Unix a forked process increfs each shared object to
1072 # which its parent process held a reference. If the
1073 # forked process gets terminated then there is likely to
1074 # be a reference leak. So to prevent
1075 # _TestZZZNumberOfObjects from failing we skip this test
1076 # when using a manager.
1077 return
1078
1079 result = self.pool.map_async(
1080 time.sleep, [0.1 for i in range(10000)], chunksize=1
1081 )
1082 self.pool.terminate()
1083 join = TimingWrapper(self.pool.join)
1084 join()
1085 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001086
1087class _TestPoolWorkerLifetime(BaseTestCase):
1088
1089 ALLOWED_TYPES = ('processes', )
1090 def test_pool_worker_lifetime(self):
1091 p = multiprocessing.Pool(3, maxtasksperchild=10)
1092 self.assertEqual(3, len(p._pool))
1093 origworkerpids = [w.pid for w in p._pool]
1094 # Run many tasks so each worker gets replaced (hopefully)
1095 results = []
1096 for i in range(100):
1097 results.append(p.apply_async(sqr, (i, )))
1098 # Fetch the results and verify we got the right answers,
1099 # also ensuring all the tasks have completed.
1100 for (j, res) in enumerate(results):
1101 self.assertEqual(res.get(), sqr(j))
1102 # Refill the pool
1103 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001104 # Wait until all workers are alive
1105 countdown = 5
1106 while countdown and not all(w.is_alive() for w in p._pool):
1107 countdown -= 1
1108 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001109 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001110 # All pids should be assigned. See issue #7805.
1111 self.assertNotIn(None, origworkerpids)
1112 self.assertNotIn(None, finalworkerpids)
1113 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001114 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1115 p.close()
1116 p.join()
1117
Benjamin Petersondfd79492008-06-13 19:13:39 +00001118#
1119# Test that manager has expected number of shared objects left
1120#
1121
1122class _TestZZZNumberOfObjects(BaseTestCase):
1123 # Because test cases are sorted alphabetically, this one will get
1124 # run after all the other tests for the manager. It tests that
1125 # there have been no "reference leaks" for the manager's shared
1126 # objects. Note the comment in _TestPool.test_terminate().
1127 ALLOWED_TYPES = ('manager',)
1128
1129 def test_number_of_objects(self):
1130 EXPECTED_NUMBER = 1 # the pool object is still alive
1131 multiprocessing.active_children() # discard dead process objs
1132 gc.collect() # do garbage collection
1133 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001134 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001135 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001136 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001137 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001138
1139 self.assertEqual(refs, EXPECTED_NUMBER)
1140
1141#
1142# Test of creating a customized manager class
1143#
1144
1145from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1146
1147class FooBar(object):
1148 def f(self):
1149 return 'f()'
1150 def g(self):
1151 raise ValueError
1152 def _h(self):
1153 return '_h()'
1154
1155def baz():
1156 for i in xrange(10):
1157 yield i*i
1158
1159class IteratorProxy(BaseProxy):
1160 _exposed_ = ('next', '__next__')
1161 def __iter__(self):
1162 return self
1163 def next(self):
1164 return self._callmethod('next')
1165 def __next__(self):
1166 return self._callmethod('__next__')
1167
1168class MyManager(BaseManager):
1169 pass
1170
1171MyManager.register('Foo', callable=FooBar)
1172MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1173MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1174
1175
1176class _TestMyManager(BaseTestCase):
1177
1178 ALLOWED_TYPES = ('manager',)
1179
1180 def test_mymanager(self):
1181 manager = MyManager()
1182 manager.start()
1183
1184 foo = manager.Foo()
1185 bar = manager.Bar()
1186 baz = manager.baz()
1187
1188 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1189 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1190
1191 self.assertEqual(foo_methods, ['f', 'g'])
1192 self.assertEqual(bar_methods, ['f', '_h'])
1193
1194 self.assertEqual(foo.f(), 'f()')
1195 self.assertRaises(ValueError, foo.g)
1196 self.assertEqual(foo._callmethod('f'), 'f()')
1197 self.assertRaises(RemoteError, foo._callmethod, '_h')
1198
1199 self.assertEqual(bar.f(), 'f()')
1200 self.assertEqual(bar._h(), '_h()')
1201 self.assertEqual(bar._callmethod('f'), 'f()')
1202 self.assertEqual(bar._callmethod('_h'), '_h()')
1203
1204 self.assertEqual(list(baz), [i*i for i in range(10)])
1205
1206 manager.shutdown()
1207
1208#
1209# Test of connecting to a remote server and using xmlrpclib for serialization
1210#
1211
1212_queue = Queue.Queue()
1213def get_queue():
1214 return _queue
1215
1216class QueueManager(BaseManager):
1217 '''manager class used by server process'''
1218QueueManager.register('get_queue', callable=get_queue)
1219
1220class QueueManager2(BaseManager):
1221 '''manager class which specifies the same interface as QueueManager'''
1222QueueManager2.register('get_queue')
1223
1224
1225SERIALIZER = 'xmlrpclib'
1226
1227class _TestRemoteManager(BaseTestCase):
1228
1229 ALLOWED_TYPES = ('manager',)
1230
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001231 @classmethod
1232 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001233 manager = QueueManager2(
1234 address=address, authkey=authkey, serializer=SERIALIZER
1235 )
1236 manager.connect()
1237 queue = manager.get_queue()
1238 queue.put(('hello world', None, True, 2.25))
1239
1240 def test_remote(self):
1241 authkey = os.urandom(32)
1242
1243 manager = QueueManager(
1244 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1245 )
1246 manager.start()
1247
1248 p = self.Process(target=self._putter, args=(manager.address, authkey))
1249 p.start()
1250
1251 manager2 = QueueManager2(
1252 address=manager.address, authkey=authkey, serializer=SERIALIZER
1253 )
1254 manager2.connect()
1255 queue = manager2.get_queue()
1256
1257 # Note that xmlrpclib will deserialize object as a list not a tuple
1258 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1259
1260 # Because we are using xmlrpclib for serialization instead of
1261 # pickle this will cause a serialization error.
1262 self.assertRaises(Exception, queue.put, time.sleep)
1263
1264 # Make queue finalizer run before the server is stopped
1265 del queue
1266 manager.shutdown()
1267
Jesse Noller459a6482009-03-30 15:50:42 +00001268class _TestManagerRestart(BaseTestCase):
1269
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001270 @classmethod
1271 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001272 manager = QueueManager(
1273 address=address, authkey=authkey, serializer=SERIALIZER)
1274 manager.connect()
1275 queue = manager.get_queue()
1276 queue.put('hello world')
1277
1278 def test_rapid_restart(self):
1279 authkey = os.urandom(32)
1280 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001281 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001282 srvr = manager.get_server()
1283 addr = srvr.address
1284 # Close the connection.Listener socket which gets opened as a part
1285 # of manager.get_server(). It's not needed for the test.
1286 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001287 manager.start()
1288
1289 p = self.Process(target=self._putter, args=(manager.address, authkey))
1290 p.start()
1291 queue = manager.get_queue()
1292 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001293 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001294 manager.shutdown()
1295 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001296 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001297 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001298 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001299
Benjamin Petersondfd79492008-06-13 19:13:39 +00001300#
1301#
1302#
1303
1304SENTINEL = latin('')
1305
1306class _TestConnection(BaseTestCase):
1307
1308 ALLOWED_TYPES = ('processes', 'threads')
1309
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001310 @classmethod
1311 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001312 for msg in iter(conn.recv_bytes, SENTINEL):
1313 conn.send_bytes(msg)
1314 conn.close()
1315
1316 def test_connection(self):
1317 conn, child_conn = self.Pipe()
1318
1319 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001320 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001321 p.start()
1322
1323 seq = [1, 2.25, None]
1324 msg = latin('hello world')
1325 longmsg = msg * 10
1326 arr = array.array('i', range(4))
1327
1328 if self.TYPE == 'processes':
1329 self.assertEqual(type(conn.fileno()), int)
1330
1331 self.assertEqual(conn.send(seq), None)
1332 self.assertEqual(conn.recv(), seq)
1333
1334 self.assertEqual(conn.send_bytes(msg), None)
1335 self.assertEqual(conn.recv_bytes(), msg)
1336
1337 if self.TYPE == 'processes':
1338 buffer = array.array('i', [0]*10)
1339 expected = list(arr) + [0] * (10 - len(arr))
1340 self.assertEqual(conn.send_bytes(arr), None)
1341 self.assertEqual(conn.recv_bytes_into(buffer),
1342 len(arr) * buffer.itemsize)
1343 self.assertEqual(list(buffer), expected)
1344
1345 buffer = array.array('i', [0]*10)
1346 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1347 self.assertEqual(conn.send_bytes(arr), None)
1348 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1349 len(arr) * buffer.itemsize)
1350 self.assertEqual(list(buffer), expected)
1351
1352 buffer = bytearray(latin(' ' * 40))
1353 self.assertEqual(conn.send_bytes(longmsg), None)
1354 try:
1355 res = conn.recv_bytes_into(buffer)
1356 except multiprocessing.BufferTooShort, e:
1357 self.assertEqual(e.args, (longmsg,))
1358 else:
1359 self.fail('expected BufferTooShort, got %s' % res)
1360
1361 poll = TimingWrapper(conn.poll)
1362
1363 self.assertEqual(poll(), False)
1364 self.assertTimingAlmostEqual(poll.elapsed, 0)
1365
1366 self.assertEqual(poll(TIMEOUT1), False)
1367 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1368
1369 conn.send(None)
1370
1371 self.assertEqual(poll(TIMEOUT1), True)
1372 self.assertTimingAlmostEqual(poll.elapsed, 0)
1373
1374 self.assertEqual(conn.recv(), None)
1375
1376 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1377 conn.send_bytes(really_big_msg)
1378 self.assertEqual(conn.recv_bytes(), really_big_msg)
1379
1380 conn.send_bytes(SENTINEL) # tell child to quit
1381 child_conn.close()
1382
1383 if self.TYPE == 'processes':
1384 self.assertEqual(conn.readable, True)
1385 self.assertEqual(conn.writable, True)
1386 self.assertRaises(EOFError, conn.recv)
1387 self.assertRaises(EOFError, conn.recv_bytes)
1388
1389 p.join()
1390
1391 def test_duplex_false(self):
1392 reader, writer = self.Pipe(duplex=False)
1393 self.assertEqual(writer.send(1), None)
1394 self.assertEqual(reader.recv(), 1)
1395 if self.TYPE == 'processes':
1396 self.assertEqual(reader.readable, True)
1397 self.assertEqual(reader.writable, False)
1398 self.assertEqual(writer.readable, False)
1399 self.assertEqual(writer.writable, True)
1400 self.assertRaises(IOError, reader.send, 2)
1401 self.assertRaises(IOError, writer.recv)
1402 self.assertRaises(IOError, writer.poll)
1403
1404 def test_spawn_close(self):
1405 # We test that a pipe connection can be closed by parent
1406 # process immediately after child is spawned. On Windows this
1407 # would have sometimes failed on old versions because
1408 # child_conn would be closed before the child got a chance to
1409 # duplicate it.
1410 conn, child_conn = self.Pipe()
1411
1412 p = self.Process(target=self._echo, args=(child_conn,))
1413 p.start()
1414 child_conn.close() # this might complete before child initializes
1415
1416 msg = latin('hello')
1417 conn.send_bytes(msg)
1418 self.assertEqual(conn.recv_bytes(), msg)
1419
1420 conn.send_bytes(SENTINEL)
1421 conn.close()
1422 p.join()
1423
1424 def test_sendbytes(self):
1425 if self.TYPE != 'processes':
1426 return
1427
1428 msg = latin('abcdefghijklmnopqrstuvwxyz')
1429 a, b = self.Pipe()
1430
1431 a.send_bytes(msg)
1432 self.assertEqual(b.recv_bytes(), msg)
1433
1434 a.send_bytes(msg, 5)
1435 self.assertEqual(b.recv_bytes(), msg[5:])
1436
1437 a.send_bytes(msg, 7, 8)
1438 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1439
1440 a.send_bytes(msg, 26)
1441 self.assertEqual(b.recv_bytes(), latin(''))
1442
1443 a.send_bytes(msg, 26, 0)
1444 self.assertEqual(b.recv_bytes(), latin(''))
1445
1446 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1447
1448 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1449
1450 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1451
1452 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1453
1454 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1455
Benjamin Petersondfd79492008-06-13 19:13:39 +00001456class _TestListenerClient(BaseTestCase):
1457
1458 ALLOWED_TYPES = ('processes', 'threads')
1459
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001460 @classmethod
1461 def _test(cls, address):
1462 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001463 conn.send('hello')
1464 conn.close()
1465
1466 def test_listener_client(self):
1467 for family in self.connection.families:
1468 l = self.connection.Listener(family=family)
1469 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001470 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001471 p.start()
1472 conn = l.accept()
1473 self.assertEqual(conn.recv(), 'hello')
1474 p.join()
1475 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001476#
1477# Test of sending connection and socket objects between processes
1478#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001479"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001480class _TestPicklingConnections(BaseTestCase):
1481
1482 ALLOWED_TYPES = ('processes',)
1483
1484 def _listener(self, conn, families):
1485 for fam in families:
1486 l = self.connection.Listener(family=fam)
1487 conn.send(l.address)
1488 new_conn = l.accept()
1489 conn.send(new_conn)
1490
1491 if self.TYPE == 'processes':
1492 l = socket.socket()
1493 l.bind(('localhost', 0))
1494 conn.send(l.getsockname())
1495 l.listen(1)
1496 new_conn, addr = l.accept()
1497 conn.send(new_conn)
1498
1499 conn.recv()
1500
1501 def _remote(self, conn):
1502 for (address, msg) in iter(conn.recv, None):
1503 client = self.connection.Client(address)
1504 client.send(msg.upper())
1505 client.close()
1506
1507 if self.TYPE == 'processes':
1508 address, msg = conn.recv()
1509 client = socket.socket()
1510 client.connect(address)
1511 client.sendall(msg.upper())
1512 client.close()
1513
1514 conn.close()
1515
1516 def test_pickling(self):
1517 try:
1518 multiprocessing.allow_connection_pickling()
1519 except ImportError:
1520 return
1521
1522 families = self.connection.families
1523
1524 lconn, lconn0 = self.Pipe()
1525 lp = self.Process(target=self._listener, args=(lconn0, families))
1526 lp.start()
1527 lconn0.close()
1528
1529 rconn, rconn0 = self.Pipe()
1530 rp = self.Process(target=self._remote, args=(rconn0,))
1531 rp.start()
1532 rconn0.close()
1533
1534 for fam in families:
1535 msg = ('This connection uses family %s' % fam).encode('ascii')
1536 address = lconn.recv()
1537 rconn.send((address, msg))
1538 new_conn = lconn.recv()
1539 self.assertEqual(new_conn.recv(), msg.upper())
1540
1541 rconn.send(None)
1542
1543 if self.TYPE == 'processes':
1544 msg = latin('This connection uses a normal socket')
1545 address = lconn.recv()
1546 rconn.send((address, msg))
1547 if hasattr(socket, 'fromfd'):
1548 new_conn = lconn.recv()
1549 self.assertEqual(new_conn.recv(100), msg.upper())
1550 else:
1551 # XXX On Windows with Py2.6 need to backport fromfd()
1552 discard = lconn.recv_bytes()
1553
1554 lconn.send(None)
1555
1556 rconn.close()
1557 lconn.close()
1558
1559 lp.join()
1560 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001561"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001562#
1563#
1564#
1565
1566class _TestHeap(BaseTestCase):
1567
1568 ALLOWED_TYPES = ('processes',)
1569
1570 def test_heap(self):
1571 iterations = 5000
1572 maxblocks = 50
1573 blocks = []
1574
1575 # create and destroy lots of blocks of different sizes
1576 for i in xrange(iterations):
1577 size = int(random.lognormvariate(0, 1) * 1000)
1578 b = multiprocessing.heap.BufferWrapper(size)
1579 blocks.append(b)
1580 if len(blocks) > maxblocks:
1581 i = random.randrange(maxblocks)
1582 del blocks[i]
1583
1584 # get the heap object
1585 heap = multiprocessing.heap.BufferWrapper._heap
1586
1587 # verify the state of the heap
1588 all = []
1589 occupied = 0
1590 for L in heap._len_to_seq.values():
1591 for arena, start, stop in L:
1592 all.append((heap._arenas.index(arena), start, stop,
1593 stop-start, 'free'))
1594 for arena, start, stop in heap._allocated_blocks:
1595 all.append((heap._arenas.index(arena), start, stop,
1596 stop-start, 'occupied'))
1597 occupied += (stop-start)
1598
1599 all.sort()
1600
1601 for i in range(len(all)-1):
1602 (arena, start, stop) = all[i][:3]
1603 (narena, nstart, nstop) = all[i+1][:3]
1604 self.assertTrue((arena != narena and nstart == 0) or
1605 (stop == nstart))
1606
1607#
1608#
1609#
1610
Benjamin Petersondfd79492008-06-13 19:13:39 +00001611class _Foo(Structure):
1612 _fields_ = [
1613 ('x', c_int),
1614 ('y', c_double)
1615 ]
1616
Brian Curtina06e9b82010-10-07 02:27:41 +00001617@unittest.skipUnless(HAS_SHAREDCTYPES,
1618 "requires multiprocessing.sharedctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001619class _TestSharedCTypes(BaseTestCase):
1620
1621 ALLOWED_TYPES = ('processes',)
1622
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001623 @classmethod
1624 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001625 x.value *= 2
1626 y.value *= 2
1627 foo.x *= 2
1628 foo.y *= 2
1629 string.value *= 2
1630 for i in range(len(arr)):
1631 arr[i] *= 2
1632
1633 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001634 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001635 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001636 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001637 arr = self.Array('d', range(10), lock=lock)
1638 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001639 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001640
1641 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1642 p.start()
1643 p.join()
1644
1645 self.assertEqual(x.value, 14)
1646 self.assertAlmostEqual(y.value, 2.0/3.0)
1647 self.assertEqual(foo.x, 6)
1648 self.assertAlmostEqual(foo.y, 4.0)
1649 for i in range(10):
1650 self.assertAlmostEqual(arr[i], i*2)
1651 self.assertEqual(string.value, latin('hellohello'))
1652
1653 def test_synchronize(self):
1654 self.test_sharedctypes(lock=True)
1655
1656 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001657 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001658 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001659 foo.x = 0
1660 foo.y = 0
1661 self.assertEqual(bar.x, 2)
1662 self.assertAlmostEqual(bar.y, 5.0)
1663
1664#
1665#
1666#
1667
1668class _TestFinalize(BaseTestCase):
1669
1670 ALLOWED_TYPES = ('processes',)
1671
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001672 @classmethod
1673 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001674 class Foo(object):
1675 pass
1676
1677 a = Foo()
1678 util.Finalize(a, conn.send, args=('a',))
1679 del a # triggers callback for a
1680
1681 b = Foo()
1682 close_b = util.Finalize(b, conn.send, args=('b',))
1683 close_b() # triggers callback for b
1684 close_b() # does nothing because callback has already been called
1685 del b # does nothing because callback has already been called
1686
1687 c = Foo()
1688 util.Finalize(c, conn.send, args=('c',))
1689
1690 d10 = Foo()
1691 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1692
1693 d01 = Foo()
1694 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1695 d02 = Foo()
1696 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1697 d03 = Foo()
1698 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1699
1700 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1701
1702 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1703
1704 # call mutliprocessing's cleanup function then exit process without
1705 # garbage collecting locals
1706 util._exit_function()
1707 conn.close()
1708 os._exit(0)
1709
1710 def test_finalize(self):
1711 conn, child_conn = self.Pipe()
1712
1713 p = self.Process(target=self._test_finalize, args=(child_conn,))
1714 p.start()
1715 p.join()
1716
1717 result = [obj for obj in iter(conn.recv, 'STOP')]
1718 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1719
1720#
1721# Test that from ... import * works for each module
1722#
1723
1724class _TestImportStar(BaseTestCase):
1725
1726 ALLOWED_TYPES = ('processes',)
1727
1728 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001729 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001730 'multiprocessing', 'multiprocessing.connection',
1731 'multiprocessing.heap', 'multiprocessing.managers',
1732 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001733 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001734 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001735 ]
1736
1737 if c_int is not None:
1738 # This module requires _ctypes
1739 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001740
1741 for name in modules:
1742 __import__(name)
1743 mod = sys.modules[name]
1744
1745 for attr in getattr(mod, '__all__', ()):
1746 self.assertTrue(
1747 hasattr(mod, attr),
1748 '%r does not have attribute %r' % (mod, attr)
1749 )
1750
1751#
1752# Quick test that logging works -- does not test logging output
1753#
1754
1755class _TestLogging(BaseTestCase):
1756
1757 ALLOWED_TYPES = ('processes',)
1758
1759 def test_enable_logging(self):
1760 logger = multiprocessing.get_logger()
1761 logger.setLevel(util.SUBWARNING)
1762 self.assertTrue(logger is not None)
1763 logger.debug('this will not be printed')
1764 logger.info('nor will this')
1765 logger.setLevel(LOG_LEVEL)
1766
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001767 @classmethod
1768 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001769 logger = multiprocessing.get_logger()
1770 conn.send(logger.getEffectiveLevel())
1771
1772 def test_level(self):
1773 LEVEL1 = 32
1774 LEVEL2 = 37
1775
1776 logger = multiprocessing.get_logger()
1777 root_logger = logging.getLogger()
1778 root_level = root_logger.level
1779
1780 reader, writer = multiprocessing.Pipe(duplex=False)
1781
1782 logger.setLevel(LEVEL1)
1783 self.Process(target=self._test_level, args=(writer,)).start()
1784 self.assertEqual(LEVEL1, reader.recv())
1785
1786 logger.setLevel(logging.NOTSET)
1787 root_logger.setLevel(LEVEL2)
1788 self.Process(target=self._test_level, args=(writer,)).start()
1789 self.assertEqual(LEVEL2, reader.recv())
1790
1791 root_logger.setLevel(root_level)
1792 logger.setLevel(level=LOG_LEVEL)
1793
Jesse Noller814d02d2009-11-21 14:38:23 +00001794
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001795# class _TestLoggingProcessName(BaseTestCase):
1796#
1797# def handle(self, record):
1798# assert record.processName == multiprocessing.current_process().name
1799# self.__handled = True
1800#
1801# def test_logging(self):
1802# handler = logging.Handler()
1803# handler.handle = self.handle
1804# self.__handled = False
1805# # Bypass getLogger() and side-effects
1806# logger = logging.getLoggerClass()(
1807# 'multiprocessing.test.TestLoggingProcessName')
1808# logger.addHandler(handler)
1809# logger.propagate = False
1810#
1811# logger.warn('foo')
1812# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001813
Benjamin Petersondfd79492008-06-13 19:13:39 +00001814#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001815# Test to verify handle verification, see issue 3321
1816#
1817
1818class TestInvalidHandle(unittest.TestCase):
1819
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001820 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001821 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001822 conn = _multiprocessing.Connection(44977608)
1823 self.assertRaises(IOError, conn.poll)
1824 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001825
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001826#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001827# Functions used to create test cases from the base ones in this module
1828#
1829
1830def get_attributes(Source, names):
1831 d = {}
1832 for name in names:
1833 obj = getattr(Source, name)
1834 if type(obj) == type(get_attributes):
1835 obj = staticmethod(obj)
1836 d[name] = obj
1837 return d
1838
1839def create_test_cases(Mixin, type):
1840 result = {}
1841 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001842 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001843
1844 for name in glob.keys():
1845 if name.startswith('_Test'):
1846 base = glob[name]
1847 if type in base.ALLOWED_TYPES:
1848 newname = 'With' + Type + name[1:]
1849 class Temp(base, unittest.TestCase, Mixin):
1850 pass
1851 result[newname] = Temp
1852 Temp.__name__ = newname
1853 Temp.__module__ = Mixin.__module__
1854 return result
1855
1856#
1857# Create test cases
1858#
1859
1860class ProcessesMixin(object):
1861 TYPE = 'processes'
1862 Process = multiprocessing.Process
1863 locals().update(get_attributes(multiprocessing, (
1864 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1865 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1866 'RawArray', 'current_process', 'active_children', 'Pipe',
1867 'connection', 'JoinableQueue'
1868 )))
1869
1870testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1871globals().update(testcases_processes)
1872
1873
1874class ManagerMixin(object):
1875 TYPE = 'manager'
1876 Process = multiprocessing.Process
1877 manager = object.__new__(multiprocessing.managers.SyncManager)
1878 locals().update(get_attributes(manager, (
1879 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1880 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1881 'Namespace', 'JoinableQueue'
1882 )))
1883
1884testcases_manager = create_test_cases(ManagerMixin, type='manager')
1885globals().update(testcases_manager)
1886
1887
1888class ThreadsMixin(object):
1889 TYPE = 'threads'
1890 Process = multiprocessing.dummy.Process
1891 locals().update(get_attributes(multiprocessing.dummy, (
1892 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1893 'Condition', 'Event', 'Value', 'Array', 'current_process',
1894 'active_children', 'Pipe', 'connection', 'dict', 'list',
1895 'Namespace', 'JoinableQueue'
1896 )))
1897
1898testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1899globals().update(testcases_threads)
1900
Neal Norwitz0c519b32008-08-25 01:50:24 +00001901class OtherTest(unittest.TestCase):
1902 # TODO: add more tests for deliver/answer challenge.
1903 def test_deliver_challenge_auth_failure(self):
1904 class _FakeConnection(object):
1905 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001906 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001907 def send_bytes(self, data):
1908 pass
1909 self.assertRaises(multiprocessing.AuthenticationError,
1910 multiprocessing.connection.deliver_challenge,
1911 _FakeConnection(), b'abc')
1912
1913 def test_answer_challenge_auth_failure(self):
1914 class _FakeConnection(object):
1915 def __init__(self):
1916 self.count = 0
1917 def recv_bytes(self, size):
1918 self.count += 1
1919 if self.count == 1:
1920 return multiprocessing.connection.CHALLENGE
1921 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001922 return b'something bogus'
1923 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001924 def send_bytes(self, data):
1925 pass
1926 self.assertRaises(multiprocessing.AuthenticationError,
1927 multiprocessing.connection.answer_challenge,
1928 _FakeConnection(), b'abc')
1929
Jesse Noller7152f6d2009-04-02 05:17:26 +00001930#
1931# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1932#
1933
1934def initializer(ns):
1935 ns.test += 1
1936
1937class TestInitializers(unittest.TestCase):
1938 def setUp(self):
1939 self.mgr = multiprocessing.Manager()
1940 self.ns = self.mgr.Namespace()
1941 self.ns.test = 0
1942
1943 def tearDown(self):
1944 self.mgr.shutdown()
1945
1946 def test_manager_initializer(self):
1947 m = multiprocessing.managers.SyncManager()
1948 self.assertRaises(TypeError, m.start, 1)
1949 m.start(initializer, (self.ns,))
1950 self.assertEqual(self.ns.test, 1)
1951 m.shutdown()
1952
1953 def test_pool_initializer(self):
1954 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1955 p = multiprocessing.Pool(1, initializer, (self.ns,))
1956 p.close()
1957 p.join()
1958 self.assertEqual(self.ns.test, 1)
1959
Jesse Noller1b90efb2009-06-30 17:11:52 +00001960#
1961# Issue 5155, 5313, 5331: Test process in processes
1962# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1963#
1964
1965def _ThisSubProcess(q):
1966 try:
1967 item = q.get(block=False)
1968 except Queue.Empty:
1969 pass
1970
1971def _TestProcess(q):
1972 queue = multiprocessing.Queue()
1973 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1974 subProc.start()
1975 subProc.join()
1976
1977def _afunc(x):
1978 return x*x
1979
1980def pool_in_process():
1981 pool = multiprocessing.Pool(processes=4)
1982 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1983
1984class _file_like(object):
1985 def __init__(self, delegate):
1986 self._delegate = delegate
1987 self._pid = None
1988
1989 @property
1990 def cache(self):
1991 pid = os.getpid()
1992 # There are no race conditions since fork keeps only the running thread
1993 if pid != self._pid:
1994 self._pid = pid
1995 self._cache = []
1996 return self._cache
1997
1998 def write(self, data):
1999 self.cache.append(data)
2000
2001 def flush(self):
2002 self._delegate.write(''.join(self.cache))
2003 self._cache = []
2004
2005class TestStdinBadfiledescriptor(unittest.TestCase):
2006
2007 def test_queue_in_process(self):
2008 queue = multiprocessing.Queue()
2009 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2010 proc.start()
2011 proc.join()
2012
2013 def test_pool_in_process(self):
2014 p = multiprocessing.Process(target=pool_in_process)
2015 p.start()
2016 p.join()
2017
2018 def test_flushing(self):
2019 sio = StringIO()
2020 flike = _file_like(sio)
2021 flike.write('foo')
2022 proc = multiprocessing.Process(target=lambda: flike.flush())
2023 flike.flush()
2024 assert sio.getvalue() == 'foo'
2025
2026testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2027 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002028
Benjamin Petersondfd79492008-06-13 19:13:39 +00002029#
2030#
2031#
2032
2033def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002034 if sys.platform.startswith("linux"):
2035 try:
2036 lock = multiprocessing.RLock()
2037 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002038 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002039
Benjamin Petersondfd79492008-06-13 19:13:39 +00002040 if run is None:
2041 from test.test_support import run_unittest as run
2042
2043 util.get_temp_dir() # creates temp directory for use by all processes
2044
2045 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2046
Jesse Noller146b7ab2008-07-02 16:44:09 +00002047 ProcessesMixin.pool = multiprocessing.Pool(4)
2048 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2049 ManagerMixin.manager.__init__()
2050 ManagerMixin.manager.start()
2051 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002052
2053 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002054 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2055 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002056 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2057 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002058 )
2059
2060 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2061 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002062 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2063 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002064 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002065 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002066 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002067 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2068 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2069 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002070 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002071
Jesse Noller146b7ab2008-07-02 16:44:09 +00002072 ThreadsMixin.pool.terminate()
2073 ProcessesMixin.pool.terminate()
2074 ManagerMixin.pool.terminate()
2075 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002076
Jesse Noller146b7ab2008-07-02 16:44:09 +00002077 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002078
2079def main():
2080 test_main(unittest.TextTestRunner(verbosity=2).run)
2081
2082if __name__ == '__main__':
2083 main()