blob: bdfc171253745d9400a95c85241e6a8d9f72cf7b [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020021# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000022# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
Brian Curtina06e9b82010-10-07 02:27:41 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44#
45#
46
Benjamin Petersone79edf52008-07-13 18:34:58 +000047latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000048
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000054#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000055
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersondfd79492008-06-13 19:13:39 +000071#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000072# Some tests require ctypes
73#
74
75try:
Nick Coghlan13623662010-04-10 14:24:36 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersondfd79492008-06-13 19:13:39 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersondfd79492008-06-13 19:13:39 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155
156 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000158 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000162
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 q.put(args)
167 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000169 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000170 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000182 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000186 self.assertEqual(p.authkey, current.authkey)
187 self.assertEqual(p.is_alive(), False)
188 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000189 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192
193 p.start()
194
Ezio Melotti2623a372010-11-21 13:34:58 +0000195 self.assertEqual(p.exitcode, None)
196 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000197 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198
Ezio Melotti2623a372010-11-21 13:34:58 +0000199 self.assertEqual(q.get(), args[1:])
200 self.assertEqual(q.get(), kwargs)
201 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000203 self.assertEqual(q.get(), current.authkey)
204 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205
206 p.join()
207
Ezio Melotti2623a372010-11-21 13:34:58 +0000208 self.assertEqual(p.exitcode, 0)
209 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000211
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000225 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000235 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 p.join()
238
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255
256 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000260 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000369 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(Queue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(Queue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(Queue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000420 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000436 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(Queue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(Queue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(Queue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(Queue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in xrange(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in xrange(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Noller82eb5902009-03-30 23:29:31 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersondfd79492008-06-13 19:13:39 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000666 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000670 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000712 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000717 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in xrange(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in xrange(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000735 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000739 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in xrange(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
Ezio Melottic2077b02011-03-16 12:34:31 +0200783 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000784 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000809 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000810
811#
812#
813#
814
815class _TestValue(BaseTestCase):
816
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000817 ALLOWED_TYPES = ('processes',)
818
Benjamin Petersondfd79492008-06-13 19:13:39 +0000819 codes_values = [
820 ('i', 4343, 24234),
821 ('d', 3.625, -4.25),
822 ('h', -232, 234),
823 ('c', latin('x'), latin('y'))
824 ]
825
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000826 def setUp(self):
827 if not HAS_SHAREDCTYPES:
828 self.skipTest("requires multiprocessing.sharedctypes")
829
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000830 @classmethod
831 def _test(cls, values):
832 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 sv.value = cv[2]
834
835
836 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000837 if raw:
838 values = [self.RawValue(code, value)
839 for code, value, _ in self.codes_values]
840 else:
841 values = [self.Value(code, value)
842 for code, value, _ in self.codes_values]
843
844 for sv, cv in zip(values, self.codes_values):
845 self.assertEqual(sv.value, cv[1])
846
847 proc = self.Process(target=self._test, args=(values,))
848 proc.start()
849 proc.join()
850
851 for sv, cv in zip(values, self.codes_values):
852 self.assertEqual(sv.value, cv[2])
853
854 def test_rawvalue(self):
855 self.test_value(raw=True)
856
857 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 val1 = self.Value('i', 5)
859 lock1 = val1.get_lock()
860 obj1 = val1.get_obj()
861
862 val2 = self.Value('i', 5, lock=None)
863 lock2 = val2.get_lock()
864 obj2 = val2.get_obj()
865
866 lock = self.Lock()
867 val3 = self.Value('i', 5, lock=lock)
868 lock3 = val3.get_lock()
869 obj3 = val3.get_obj()
870 self.assertEqual(lock, lock3)
871
Jesse Noller6ab22152009-01-18 02:45:38 +0000872 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873 self.assertFalse(hasattr(arr4, 'get_lock'))
874 self.assertFalse(hasattr(arr4, 'get_obj'))
875
Jesse Noller6ab22152009-01-18 02:45:38 +0000876 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
877
878 arr5 = self.RawValue('i', 5)
879 self.assertFalse(hasattr(arr5, 'get_lock'))
880 self.assertFalse(hasattr(arr5, 'get_obj'))
881
Benjamin Petersondfd79492008-06-13 19:13:39 +0000882
883class _TestArray(BaseTestCase):
884
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000885 ALLOWED_TYPES = ('processes',)
886
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000887 @classmethod
888 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 for i in range(1, len(seq)):
890 seq[i] += seq[i-1]
891
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000893 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000894 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
895 if raw:
896 arr = self.RawArray('i', seq)
897 else:
898 arr = self.Array('i', seq)
899
900 self.assertEqual(len(arr), len(seq))
901 self.assertEqual(arr[3], seq[3])
902 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
903
904 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
905
906 self.assertEqual(list(arr[:]), seq)
907
908 self.f(seq)
909
910 p = self.Process(target=self.f, args=(arr,))
911 p.start()
912 p.join()
913
914 self.assertEqual(list(arr[:]), seq)
915
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 def test_rawarray(self):
918 self.test_array(raw=True)
919
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000920 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000921 def test_array_accepts_long(self):
922 arr = self.Array('i', 10L)
923 self.assertEqual(len(arr), 10)
924 raw_arr = self.RawArray('i', 10L)
925 self.assertEqual(len(raw_arr), 10)
926
927 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000928 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 arr1 = self.Array('i', range(10))
930 lock1 = arr1.get_lock()
931 obj1 = arr1.get_obj()
932
933 arr2 = self.Array('i', range(10), lock=None)
934 lock2 = arr2.get_lock()
935 obj2 = arr2.get_obj()
936
937 lock = self.Lock()
938 arr3 = self.Array('i', range(10), lock=lock)
939 lock3 = arr3.get_lock()
940 obj3 = arr3.get_obj()
941 self.assertEqual(lock, lock3)
942
Jesse Noller6ab22152009-01-18 02:45:38 +0000943 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 self.assertFalse(hasattr(arr4, 'get_lock'))
945 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000946 self.assertRaises(AttributeError,
947 self.Array, 'i', range(10), lock='notalock')
948
949 arr5 = self.RawArray('i', range(10))
950 self.assertFalse(hasattr(arr5, 'get_lock'))
951 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000952
953#
954#
955#
956
957class _TestContainers(BaseTestCase):
958
959 ALLOWED_TYPES = ('manager',)
960
961 def test_list(self):
962 a = self.list(range(10))
963 self.assertEqual(a[:], range(10))
964
965 b = self.list()
966 self.assertEqual(b[:], [])
967
968 b.extend(range(5))
969 self.assertEqual(b[:], range(5))
970
971 self.assertEqual(b[2], 2)
972 self.assertEqual(b[2:10], [2,3,4])
973
974 b *= 2
975 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
976
977 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
978
979 self.assertEqual(a[:], range(10))
980
981 d = [a, b]
982 e = self.list(d)
983 self.assertEqual(
984 e[:],
985 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
986 )
987
988 f = self.list([a])
989 a.append('hello')
990 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
991
992 def test_dict(self):
993 d = self.dict()
994 indices = range(65, 70)
995 for i in indices:
996 d[i] = chr(i)
997 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
998 self.assertEqual(sorted(d.keys()), indices)
999 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1000 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1001
1002 def test_namespace(self):
1003 n = self.Namespace()
1004 n.name = 'Bob'
1005 n.job = 'Builder'
1006 n._hidden = 'hidden'
1007 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1008 del n.job
1009 self.assertEqual(str(n), "Namespace(name='Bob')")
1010 self.assertTrue(hasattr(n, 'name'))
1011 self.assertTrue(not hasattr(n, 'job'))
1012
1013#
1014#
1015#
1016
1017def sqr(x, wait=0.0):
1018 time.sleep(wait)
1019 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001020class _TestPool(BaseTestCase):
1021
1022 def test_apply(self):
1023 papply = self.pool.apply
1024 self.assertEqual(papply(sqr, (5,)), sqr(5))
1025 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1026
1027 def test_map(self):
1028 pmap = self.pool.map
1029 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1030 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1031 map(sqr, range(100)))
1032
Jesse Noller7530e472009-07-16 14:23:04 +00001033 def test_map_chunksize(self):
1034 try:
1035 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1036 except multiprocessing.TimeoutError:
1037 self.fail("pool.map_async with chunksize stalled on null list")
1038
Benjamin Petersondfd79492008-06-13 19:13:39 +00001039 def test_async(self):
1040 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1041 get = TimingWrapper(res.get)
1042 self.assertEqual(get(), 49)
1043 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1044
1045 def test_async_timeout(self):
1046 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1047 get = TimingWrapper(res.get)
1048 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1049 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1050
1051 def test_imap(self):
1052 it = self.pool.imap(sqr, range(10))
1053 self.assertEqual(list(it), map(sqr, range(10)))
1054
1055 it = self.pool.imap(sqr, range(10))
1056 for i in range(10):
1057 self.assertEqual(it.next(), i*i)
1058 self.assertRaises(StopIteration, it.next)
1059
1060 it = self.pool.imap(sqr, range(1000), chunksize=100)
1061 for i in range(1000):
1062 self.assertEqual(it.next(), i*i)
1063 self.assertRaises(StopIteration, it.next)
1064
1065 def test_imap_unordered(self):
1066 it = self.pool.imap_unordered(sqr, range(1000))
1067 self.assertEqual(sorted(it), map(sqr, range(1000)))
1068
1069 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1070 self.assertEqual(sorted(it), map(sqr, range(1000)))
1071
1072 def test_make_pool(self):
1073 p = multiprocessing.Pool(3)
1074 self.assertEqual(3, len(p._pool))
1075 p.close()
1076 p.join()
1077
1078 def test_terminate(self):
1079 if self.TYPE == 'manager':
1080 # On Unix a forked process increfs each shared object to
1081 # which its parent process held a reference. If the
1082 # forked process gets terminated then there is likely to
1083 # be a reference leak. So to prevent
1084 # _TestZZZNumberOfObjects from failing we skip this test
1085 # when using a manager.
1086 return
1087
1088 result = self.pool.map_async(
1089 time.sleep, [0.1 for i in range(10000)], chunksize=1
1090 )
1091 self.pool.terminate()
1092 join = TimingWrapper(self.pool.join)
1093 join()
1094 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001095
1096class _TestPoolWorkerLifetime(BaseTestCase):
1097
1098 ALLOWED_TYPES = ('processes', )
1099 def test_pool_worker_lifetime(self):
1100 p = multiprocessing.Pool(3, maxtasksperchild=10)
1101 self.assertEqual(3, len(p._pool))
1102 origworkerpids = [w.pid for w in p._pool]
1103 # Run many tasks so each worker gets replaced (hopefully)
1104 results = []
1105 for i in range(100):
1106 results.append(p.apply_async(sqr, (i, )))
1107 # Fetch the results and verify we got the right answers,
1108 # also ensuring all the tasks have completed.
1109 for (j, res) in enumerate(results):
1110 self.assertEqual(res.get(), sqr(j))
1111 # Refill the pool
1112 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001113 # Wait until all workers are alive
1114 countdown = 5
1115 while countdown and not all(w.is_alive() for w in p._pool):
1116 countdown -= 1
1117 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001118 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001119 # All pids should be assigned. See issue #7805.
1120 self.assertNotIn(None, origworkerpids)
1121 self.assertNotIn(None, finalworkerpids)
1122 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001123 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1124 p.close()
1125 p.join()
1126
Benjamin Petersondfd79492008-06-13 19:13:39 +00001127#
1128# Test that manager has expected number of shared objects left
1129#
1130
1131class _TestZZZNumberOfObjects(BaseTestCase):
1132 # Because test cases are sorted alphabetically, this one will get
1133 # run after all the other tests for the manager. It tests that
1134 # there have been no "reference leaks" for the manager's shared
1135 # objects. Note the comment in _TestPool.test_terminate().
1136 ALLOWED_TYPES = ('manager',)
1137
1138 def test_number_of_objects(self):
1139 EXPECTED_NUMBER = 1 # the pool object is still alive
1140 multiprocessing.active_children() # discard dead process objs
1141 gc.collect() # do garbage collection
1142 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001143 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001144 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001145 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001146 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001147
1148 self.assertEqual(refs, EXPECTED_NUMBER)
1149
1150#
1151# Test of creating a customized manager class
1152#
1153
1154from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1155
1156class FooBar(object):
1157 def f(self):
1158 return 'f()'
1159 def g(self):
1160 raise ValueError
1161 def _h(self):
1162 return '_h()'
1163
1164def baz():
1165 for i in xrange(10):
1166 yield i*i
1167
1168class IteratorProxy(BaseProxy):
1169 _exposed_ = ('next', '__next__')
1170 def __iter__(self):
1171 return self
1172 def next(self):
1173 return self._callmethod('next')
1174 def __next__(self):
1175 return self._callmethod('__next__')
1176
1177class MyManager(BaseManager):
1178 pass
1179
1180MyManager.register('Foo', callable=FooBar)
1181MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1182MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1183
1184
1185class _TestMyManager(BaseTestCase):
1186
1187 ALLOWED_TYPES = ('manager',)
1188
1189 def test_mymanager(self):
1190 manager = MyManager()
1191 manager.start()
1192
1193 foo = manager.Foo()
1194 bar = manager.Bar()
1195 baz = manager.baz()
1196
1197 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1198 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1199
1200 self.assertEqual(foo_methods, ['f', 'g'])
1201 self.assertEqual(bar_methods, ['f', '_h'])
1202
1203 self.assertEqual(foo.f(), 'f()')
1204 self.assertRaises(ValueError, foo.g)
1205 self.assertEqual(foo._callmethod('f'), 'f()')
1206 self.assertRaises(RemoteError, foo._callmethod, '_h')
1207
1208 self.assertEqual(bar.f(), 'f()')
1209 self.assertEqual(bar._h(), '_h()')
1210 self.assertEqual(bar._callmethod('f'), 'f()')
1211 self.assertEqual(bar._callmethod('_h'), '_h()')
1212
1213 self.assertEqual(list(baz), [i*i for i in range(10)])
1214
1215 manager.shutdown()
1216
1217#
1218# Test of connecting to a remote server and using xmlrpclib for serialization
1219#
1220
1221_queue = Queue.Queue()
1222def get_queue():
1223 return _queue
1224
1225class QueueManager(BaseManager):
1226 '''manager class used by server process'''
1227QueueManager.register('get_queue', callable=get_queue)
1228
1229class QueueManager2(BaseManager):
1230 '''manager class which specifies the same interface as QueueManager'''
1231QueueManager2.register('get_queue')
1232
1233
1234SERIALIZER = 'xmlrpclib'
1235
1236class _TestRemoteManager(BaseTestCase):
1237
1238 ALLOWED_TYPES = ('manager',)
1239
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001240 @classmethod
1241 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001242 manager = QueueManager2(
1243 address=address, authkey=authkey, serializer=SERIALIZER
1244 )
1245 manager.connect()
1246 queue = manager.get_queue()
1247 queue.put(('hello world', None, True, 2.25))
1248
1249 def test_remote(self):
1250 authkey = os.urandom(32)
1251
1252 manager = QueueManager(
1253 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1254 )
1255 manager.start()
1256
1257 p = self.Process(target=self._putter, args=(manager.address, authkey))
1258 p.start()
1259
1260 manager2 = QueueManager2(
1261 address=manager.address, authkey=authkey, serializer=SERIALIZER
1262 )
1263 manager2.connect()
1264 queue = manager2.get_queue()
1265
1266 # Note that xmlrpclib will deserialize object as a list not a tuple
1267 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1268
1269 # Because we are using xmlrpclib for serialization instead of
1270 # pickle this will cause a serialization error.
1271 self.assertRaises(Exception, queue.put, time.sleep)
1272
1273 # Make queue finalizer run before the server is stopped
1274 del queue
1275 manager.shutdown()
1276
Jesse Noller459a6482009-03-30 15:50:42 +00001277class _TestManagerRestart(BaseTestCase):
1278
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001279 @classmethod
1280 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001281 manager = QueueManager(
1282 address=address, authkey=authkey, serializer=SERIALIZER)
1283 manager.connect()
1284 queue = manager.get_queue()
1285 queue.put('hello world')
1286
1287 def test_rapid_restart(self):
1288 authkey = os.urandom(32)
1289 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001290 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001291 srvr = manager.get_server()
1292 addr = srvr.address
1293 # Close the connection.Listener socket which gets opened as a part
1294 # of manager.get_server(). It's not needed for the test.
1295 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001296 manager.start()
1297
1298 p = self.Process(target=self._putter, args=(manager.address, authkey))
1299 p.start()
1300 queue = manager.get_queue()
1301 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001302 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001303 manager.shutdown()
1304 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001305 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001306 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001307 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001308
Benjamin Petersondfd79492008-06-13 19:13:39 +00001309#
1310#
1311#
1312
1313SENTINEL = latin('')
1314
1315class _TestConnection(BaseTestCase):
1316
1317 ALLOWED_TYPES = ('processes', 'threads')
1318
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001319 @classmethod
1320 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001321 for msg in iter(conn.recv_bytes, SENTINEL):
1322 conn.send_bytes(msg)
1323 conn.close()
1324
1325 def test_connection(self):
1326 conn, child_conn = self.Pipe()
1327
1328 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001329 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001330 p.start()
1331
1332 seq = [1, 2.25, None]
1333 msg = latin('hello world')
1334 longmsg = msg * 10
1335 arr = array.array('i', range(4))
1336
1337 if self.TYPE == 'processes':
1338 self.assertEqual(type(conn.fileno()), int)
1339
1340 self.assertEqual(conn.send(seq), None)
1341 self.assertEqual(conn.recv(), seq)
1342
1343 self.assertEqual(conn.send_bytes(msg), None)
1344 self.assertEqual(conn.recv_bytes(), msg)
1345
1346 if self.TYPE == 'processes':
1347 buffer = array.array('i', [0]*10)
1348 expected = list(arr) + [0] * (10 - len(arr))
1349 self.assertEqual(conn.send_bytes(arr), None)
1350 self.assertEqual(conn.recv_bytes_into(buffer),
1351 len(arr) * buffer.itemsize)
1352 self.assertEqual(list(buffer), expected)
1353
1354 buffer = array.array('i', [0]*10)
1355 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1356 self.assertEqual(conn.send_bytes(arr), None)
1357 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1358 len(arr) * buffer.itemsize)
1359 self.assertEqual(list(buffer), expected)
1360
1361 buffer = bytearray(latin(' ' * 40))
1362 self.assertEqual(conn.send_bytes(longmsg), None)
1363 try:
1364 res = conn.recv_bytes_into(buffer)
1365 except multiprocessing.BufferTooShort, e:
1366 self.assertEqual(e.args, (longmsg,))
1367 else:
1368 self.fail('expected BufferTooShort, got %s' % res)
1369
1370 poll = TimingWrapper(conn.poll)
1371
1372 self.assertEqual(poll(), False)
1373 self.assertTimingAlmostEqual(poll.elapsed, 0)
1374
1375 self.assertEqual(poll(TIMEOUT1), False)
1376 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1377
1378 conn.send(None)
1379
1380 self.assertEqual(poll(TIMEOUT1), True)
1381 self.assertTimingAlmostEqual(poll.elapsed, 0)
1382
1383 self.assertEqual(conn.recv(), None)
1384
1385 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1386 conn.send_bytes(really_big_msg)
1387 self.assertEqual(conn.recv_bytes(), really_big_msg)
1388
1389 conn.send_bytes(SENTINEL) # tell child to quit
1390 child_conn.close()
1391
1392 if self.TYPE == 'processes':
1393 self.assertEqual(conn.readable, True)
1394 self.assertEqual(conn.writable, True)
1395 self.assertRaises(EOFError, conn.recv)
1396 self.assertRaises(EOFError, conn.recv_bytes)
1397
1398 p.join()
1399
1400 def test_duplex_false(self):
1401 reader, writer = self.Pipe(duplex=False)
1402 self.assertEqual(writer.send(1), None)
1403 self.assertEqual(reader.recv(), 1)
1404 if self.TYPE == 'processes':
1405 self.assertEqual(reader.readable, True)
1406 self.assertEqual(reader.writable, False)
1407 self.assertEqual(writer.readable, False)
1408 self.assertEqual(writer.writable, True)
1409 self.assertRaises(IOError, reader.send, 2)
1410 self.assertRaises(IOError, writer.recv)
1411 self.assertRaises(IOError, writer.poll)
1412
1413 def test_spawn_close(self):
1414 # We test that a pipe connection can be closed by parent
1415 # process immediately after child is spawned. On Windows this
1416 # would have sometimes failed on old versions because
1417 # child_conn would be closed before the child got a chance to
1418 # duplicate it.
1419 conn, child_conn = self.Pipe()
1420
1421 p = self.Process(target=self._echo, args=(child_conn,))
1422 p.start()
1423 child_conn.close() # this might complete before child initializes
1424
1425 msg = latin('hello')
1426 conn.send_bytes(msg)
1427 self.assertEqual(conn.recv_bytes(), msg)
1428
1429 conn.send_bytes(SENTINEL)
1430 conn.close()
1431 p.join()
1432
1433 def test_sendbytes(self):
1434 if self.TYPE != 'processes':
1435 return
1436
1437 msg = latin('abcdefghijklmnopqrstuvwxyz')
1438 a, b = self.Pipe()
1439
1440 a.send_bytes(msg)
1441 self.assertEqual(b.recv_bytes(), msg)
1442
1443 a.send_bytes(msg, 5)
1444 self.assertEqual(b.recv_bytes(), msg[5:])
1445
1446 a.send_bytes(msg, 7, 8)
1447 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1448
1449 a.send_bytes(msg, 26)
1450 self.assertEqual(b.recv_bytes(), latin(''))
1451
1452 a.send_bytes(msg, 26, 0)
1453 self.assertEqual(b.recv_bytes(), latin(''))
1454
1455 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1456
1457 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1458
1459 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1460
1461 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1462
1463 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1464
Benjamin Petersondfd79492008-06-13 19:13:39 +00001465class _TestListenerClient(BaseTestCase):
1466
1467 ALLOWED_TYPES = ('processes', 'threads')
1468
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001469 @classmethod
1470 def _test(cls, address):
1471 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001472 conn.send('hello')
1473 conn.close()
1474
1475 def test_listener_client(self):
1476 for family in self.connection.families:
1477 l = self.connection.Listener(family=family)
1478 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001479 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001480 p.start()
1481 conn = l.accept()
1482 self.assertEqual(conn.recv(), 'hello')
1483 p.join()
1484 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001485#
1486# Test of sending connection and socket objects between processes
1487#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001488"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001489class _TestPicklingConnections(BaseTestCase):
1490
1491 ALLOWED_TYPES = ('processes',)
1492
1493 def _listener(self, conn, families):
1494 for fam in families:
1495 l = self.connection.Listener(family=fam)
1496 conn.send(l.address)
1497 new_conn = l.accept()
1498 conn.send(new_conn)
1499
1500 if self.TYPE == 'processes':
1501 l = socket.socket()
1502 l.bind(('localhost', 0))
1503 conn.send(l.getsockname())
1504 l.listen(1)
1505 new_conn, addr = l.accept()
1506 conn.send(new_conn)
1507
1508 conn.recv()
1509
1510 def _remote(self, conn):
1511 for (address, msg) in iter(conn.recv, None):
1512 client = self.connection.Client(address)
1513 client.send(msg.upper())
1514 client.close()
1515
1516 if self.TYPE == 'processes':
1517 address, msg = conn.recv()
1518 client = socket.socket()
1519 client.connect(address)
1520 client.sendall(msg.upper())
1521 client.close()
1522
1523 conn.close()
1524
1525 def test_pickling(self):
1526 try:
1527 multiprocessing.allow_connection_pickling()
1528 except ImportError:
1529 return
1530
1531 families = self.connection.families
1532
1533 lconn, lconn0 = self.Pipe()
1534 lp = self.Process(target=self._listener, args=(lconn0, families))
1535 lp.start()
1536 lconn0.close()
1537
1538 rconn, rconn0 = self.Pipe()
1539 rp = self.Process(target=self._remote, args=(rconn0,))
1540 rp.start()
1541 rconn0.close()
1542
1543 for fam in families:
1544 msg = ('This connection uses family %s' % fam).encode('ascii')
1545 address = lconn.recv()
1546 rconn.send((address, msg))
1547 new_conn = lconn.recv()
1548 self.assertEqual(new_conn.recv(), msg.upper())
1549
1550 rconn.send(None)
1551
1552 if self.TYPE == 'processes':
1553 msg = latin('This connection uses a normal socket')
1554 address = lconn.recv()
1555 rconn.send((address, msg))
1556 if hasattr(socket, 'fromfd'):
1557 new_conn = lconn.recv()
1558 self.assertEqual(new_conn.recv(100), msg.upper())
1559 else:
1560 # XXX On Windows with Py2.6 need to backport fromfd()
1561 discard = lconn.recv_bytes()
1562
1563 lconn.send(None)
1564
1565 rconn.close()
1566 lconn.close()
1567
1568 lp.join()
1569 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001570"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001571#
1572#
1573#
1574
1575class _TestHeap(BaseTestCase):
1576
1577 ALLOWED_TYPES = ('processes',)
1578
1579 def test_heap(self):
1580 iterations = 5000
1581 maxblocks = 50
1582 blocks = []
1583
1584 # create and destroy lots of blocks of different sizes
1585 for i in xrange(iterations):
1586 size = int(random.lognormvariate(0, 1) * 1000)
1587 b = multiprocessing.heap.BufferWrapper(size)
1588 blocks.append(b)
1589 if len(blocks) > maxblocks:
1590 i = random.randrange(maxblocks)
1591 del blocks[i]
1592
1593 # get the heap object
1594 heap = multiprocessing.heap.BufferWrapper._heap
1595
1596 # verify the state of the heap
1597 all = []
1598 occupied = 0
1599 for L in heap._len_to_seq.values():
1600 for arena, start, stop in L:
1601 all.append((heap._arenas.index(arena), start, stop,
1602 stop-start, 'free'))
1603 for arena, start, stop in heap._allocated_blocks:
1604 all.append((heap._arenas.index(arena), start, stop,
1605 stop-start, 'occupied'))
1606 occupied += (stop-start)
1607
1608 all.sort()
1609
1610 for i in range(len(all)-1):
1611 (arena, start, stop) = all[i][:3]
1612 (narena, nstart, nstop) = all[i+1][:3]
1613 self.assertTrue((arena != narena and nstart == 0) or
1614 (stop == nstart))
1615
1616#
1617#
1618#
1619
Benjamin Petersondfd79492008-06-13 19:13:39 +00001620class _Foo(Structure):
1621 _fields_ = [
1622 ('x', c_int),
1623 ('y', c_double)
1624 ]
1625
1626class _TestSharedCTypes(BaseTestCase):
1627
1628 ALLOWED_TYPES = ('processes',)
1629
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001630 def setUp(self):
1631 if not HAS_SHAREDCTYPES:
1632 self.skipTest("requires multiprocessing.sharedctypes")
1633
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001634 @classmethod
1635 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001636 x.value *= 2
1637 y.value *= 2
1638 foo.x *= 2
1639 foo.y *= 2
1640 string.value *= 2
1641 for i in range(len(arr)):
1642 arr[i] *= 2
1643
1644 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001645 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001646 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001647 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001648 arr = self.Array('d', range(10), lock=lock)
1649 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001650 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001651
1652 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1653 p.start()
1654 p.join()
1655
1656 self.assertEqual(x.value, 14)
1657 self.assertAlmostEqual(y.value, 2.0/3.0)
1658 self.assertEqual(foo.x, 6)
1659 self.assertAlmostEqual(foo.y, 4.0)
1660 for i in range(10):
1661 self.assertAlmostEqual(arr[i], i*2)
1662 self.assertEqual(string.value, latin('hellohello'))
1663
1664 def test_synchronize(self):
1665 self.test_sharedctypes(lock=True)
1666
1667 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001668 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001669 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001670 foo.x = 0
1671 foo.y = 0
1672 self.assertEqual(bar.x, 2)
1673 self.assertAlmostEqual(bar.y, 5.0)
1674
1675#
1676#
1677#
1678
1679class _TestFinalize(BaseTestCase):
1680
1681 ALLOWED_TYPES = ('processes',)
1682
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001683 @classmethod
1684 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001685 class Foo(object):
1686 pass
1687
1688 a = Foo()
1689 util.Finalize(a, conn.send, args=('a',))
1690 del a # triggers callback for a
1691
1692 b = Foo()
1693 close_b = util.Finalize(b, conn.send, args=('b',))
1694 close_b() # triggers callback for b
1695 close_b() # does nothing because callback has already been called
1696 del b # does nothing because callback has already been called
1697
1698 c = Foo()
1699 util.Finalize(c, conn.send, args=('c',))
1700
1701 d10 = Foo()
1702 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1703
1704 d01 = Foo()
1705 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1706 d02 = Foo()
1707 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1708 d03 = Foo()
1709 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1710
1711 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1712
1713 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1714
Ezio Melottic2077b02011-03-16 12:34:31 +02001715 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001716 # garbage collecting locals
1717 util._exit_function()
1718 conn.close()
1719 os._exit(0)
1720
1721 def test_finalize(self):
1722 conn, child_conn = self.Pipe()
1723
1724 p = self.Process(target=self._test_finalize, args=(child_conn,))
1725 p.start()
1726 p.join()
1727
1728 result = [obj for obj in iter(conn.recv, 'STOP')]
1729 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1730
1731#
1732# Test that from ... import * works for each module
1733#
1734
1735class _TestImportStar(BaseTestCase):
1736
1737 ALLOWED_TYPES = ('processes',)
1738
1739 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001740 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001741 'multiprocessing', 'multiprocessing.connection',
1742 'multiprocessing.heap', 'multiprocessing.managers',
1743 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001744 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001745 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001746 ]
1747
1748 if c_int is not None:
1749 # This module requires _ctypes
1750 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001751
1752 for name in modules:
1753 __import__(name)
1754 mod = sys.modules[name]
1755
1756 for attr in getattr(mod, '__all__', ()):
1757 self.assertTrue(
1758 hasattr(mod, attr),
1759 '%r does not have attribute %r' % (mod, attr)
1760 )
1761
1762#
1763# Quick test that logging works -- does not test logging output
1764#
1765
1766class _TestLogging(BaseTestCase):
1767
1768 ALLOWED_TYPES = ('processes',)
1769
1770 def test_enable_logging(self):
1771 logger = multiprocessing.get_logger()
1772 logger.setLevel(util.SUBWARNING)
1773 self.assertTrue(logger is not None)
1774 logger.debug('this will not be printed')
1775 logger.info('nor will this')
1776 logger.setLevel(LOG_LEVEL)
1777
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001778 @classmethod
1779 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001780 logger = multiprocessing.get_logger()
1781 conn.send(logger.getEffectiveLevel())
1782
1783 def test_level(self):
1784 LEVEL1 = 32
1785 LEVEL2 = 37
1786
1787 logger = multiprocessing.get_logger()
1788 root_logger = logging.getLogger()
1789 root_level = root_logger.level
1790
1791 reader, writer = multiprocessing.Pipe(duplex=False)
1792
1793 logger.setLevel(LEVEL1)
1794 self.Process(target=self._test_level, args=(writer,)).start()
1795 self.assertEqual(LEVEL1, reader.recv())
1796
1797 logger.setLevel(logging.NOTSET)
1798 root_logger.setLevel(LEVEL2)
1799 self.Process(target=self._test_level, args=(writer,)).start()
1800 self.assertEqual(LEVEL2, reader.recv())
1801
1802 root_logger.setLevel(root_level)
1803 logger.setLevel(level=LOG_LEVEL)
1804
Jesse Noller814d02d2009-11-21 14:38:23 +00001805
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001806# class _TestLoggingProcessName(BaseTestCase):
1807#
1808# def handle(self, record):
1809# assert record.processName == multiprocessing.current_process().name
1810# self.__handled = True
1811#
1812# def test_logging(self):
1813# handler = logging.Handler()
1814# handler.handle = self.handle
1815# self.__handled = False
1816# # Bypass getLogger() and side-effects
1817# logger = logging.getLoggerClass()(
1818# 'multiprocessing.test.TestLoggingProcessName')
1819# logger.addHandler(handler)
1820# logger.propagate = False
1821#
1822# logger.warn('foo')
1823# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001824
Benjamin Petersondfd79492008-06-13 19:13:39 +00001825#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001826# Test to verify handle verification, see issue 3321
1827#
1828
1829class TestInvalidHandle(unittest.TestCase):
1830
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001831 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001832 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001833 conn = _multiprocessing.Connection(44977608)
1834 self.assertRaises(IOError, conn.poll)
1835 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001836
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001837#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001838# Functions used to create test cases from the base ones in this module
1839#
1840
1841def get_attributes(Source, names):
1842 d = {}
1843 for name in names:
1844 obj = getattr(Source, name)
1845 if type(obj) == type(get_attributes):
1846 obj = staticmethod(obj)
1847 d[name] = obj
1848 return d
1849
1850def create_test_cases(Mixin, type):
1851 result = {}
1852 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001853 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001854
1855 for name in glob.keys():
1856 if name.startswith('_Test'):
1857 base = glob[name]
1858 if type in base.ALLOWED_TYPES:
1859 newname = 'With' + Type + name[1:]
1860 class Temp(base, unittest.TestCase, Mixin):
1861 pass
1862 result[newname] = Temp
1863 Temp.__name__ = newname
1864 Temp.__module__ = Mixin.__module__
1865 return result
1866
1867#
1868# Create test cases
1869#
1870
1871class ProcessesMixin(object):
1872 TYPE = 'processes'
1873 Process = multiprocessing.Process
1874 locals().update(get_attributes(multiprocessing, (
1875 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1876 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1877 'RawArray', 'current_process', 'active_children', 'Pipe',
1878 'connection', 'JoinableQueue'
1879 )))
1880
1881testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1882globals().update(testcases_processes)
1883
1884
1885class ManagerMixin(object):
1886 TYPE = 'manager'
1887 Process = multiprocessing.Process
1888 manager = object.__new__(multiprocessing.managers.SyncManager)
1889 locals().update(get_attributes(manager, (
1890 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1891 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1892 'Namespace', 'JoinableQueue'
1893 )))
1894
1895testcases_manager = create_test_cases(ManagerMixin, type='manager')
1896globals().update(testcases_manager)
1897
1898
1899class ThreadsMixin(object):
1900 TYPE = 'threads'
1901 Process = multiprocessing.dummy.Process
1902 locals().update(get_attributes(multiprocessing.dummy, (
1903 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1904 'Condition', 'Event', 'Value', 'Array', 'current_process',
1905 'active_children', 'Pipe', 'connection', 'dict', 'list',
1906 'Namespace', 'JoinableQueue'
1907 )))
1908
1909testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1910globals().update(testcases_threads)
1911
Neal Norwitz0c519b32008-08-25 01:50:24 +00001912class OtherTest(unittest.TestCase):
1913 # TODO: add more tests for deliver/answer challenge.
1914 def test_deliver_challenge_auth_failure(self):
1915 class _FakeConnection(object):
1916 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001917 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001918 def send_bytes(self, data):
1919 pass
1920 self.assertRaises(multiprocessing.AuthenticationError,
1921 multiprocessing.connection.deliver_challenge,
1922 _FakeConnection(), b'abc')
1923
1924 def test_answer_challenge_auth_failure(self):
1925 class _FakeConnection(object):
1926 def __init__(self):
1927 self.count = 0
1928 def recv_bytes(self, size):
1929 self.count += 1
1930 if self.count == 1:
1931 return multiprocessing.connection.CHALLENGE
1932 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001933 return b'something bogus'
1934 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001935 def send_bytes(self, data):
1936 pass
1937 self.assertRaises(multiprocessing.AuthenticationError,
1938 multiprocessing.connection.answer_challenge,
1939 _FakeConnection(), b'abc')
1940
Jesse Noller7152f6d2009-04-02 05:17:26 +00001941#
1942# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1943#
1944
1945def initializer(ns):
1946 ns.test += 1
1947
1948class TestInitializers(unittest.TestCase):
1949 def setUp(self):
1950 self.mgr = multiprocessing.Manager()
1951 self.ns = self.mgr.Namespace()
1952 self.ns.test = 0
1953
1954 def tearDown(self):
1955 self.mgr.shutdown()
1956
1957 def test_manager_initializer(self):
1958 m = multiprocessing.managers.SyncManager()
1959 self.assertRaises(TypeError, m.start, 1)
1960 m.start(initializer, (self.ns,))
1961 self.assertEqual(self.ns.test, 1)
1962 m.shutdown()
1963
1964 def test_pool_initializer(self):
1965 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1966 p = multiprocessing.Pool(1, initializer, (self.ns,))
1967 p.close()
1968 p.join()
1969 self.assertEqual(self.ns.test, 1)
1970
Jesse Noller1b90efb2009-06-30 17:11:52 +00001971#
1972# Issue 5155, 5313, 5331: Test process in processes
1973# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1974#
1975
1976def _ThisSubProcess(q):
1977 try:
1978 item = q.get(block=False)
1979 except Queue.Empty:
1980 pass
1981
1982def _TestProcess(q):
1983 queue = multiprocessing.Queue()
1984 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1985 subProc.start()
1986 subProc.join()
1987
1988def _afunc(x):
1989 return x*x
1990
1991def pool_in_process():
1992 pool = multiprocessing.Pool(processes=4)
1993 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1994
1995class _file_like(object):
1996 def __init__(self, delegate):
1997 self._delegate = delegate
1998 self._pid = None
1999
2000 @property
2001 def cache(self):
2002 pid = os.getpid()
2003 # There are no race conditions since fork keeps only the running thread
2004 if pid != self._pid:
2005 self._pid = pid
2006 self._cache = []
2007 return self._cache
2008
2009 def write(self, data):
2010 self.cache.append(data)
2011
2012 def flush(self):
2013 self._delegate.write(''.join(self.cache))
2014 self._cache = []
2015
2016class TestStdinBadfiledescriptor(unittest.TestCase):
2017
2018 def test_queue_in_process(self):
2019 queue = multiprocessing.Queue()
2020 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2021 proc.start()
2022 proc.join()
2023
2024 def test_pool_in_process(self):
2025 p = multiprocessing.Process(target=pool_in_process)
2026 p.start()
2027 p.join()
2028
2029 def test_flushing(self):
2030 sio = StringIO()
2031 flike = _file_like(sio)
2032 flike.write('foo')
2033 proc = multiprocessing.Process(target=lambda: flike.flush())
2034 flike.flush()
2035 assert sio.getvalue() == 'foo'
2036
2037testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2038 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002039
Benjamin Petersondfd79492008-06-13 19:13:39 +00002040#
2041#
2042#
2043
2044def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002045 if sys.platform.startswith("linux"):
2046 try:
2047 lock = multiprocessing.RLock()
2048 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002049 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002050
Benjamin Petersondfd79492008-06-13 19:13:39 +00002051 if run is None:
2052 from test.test_support import run_unittest as run
2053
2054 util.get_temp_dir() # creates temp directory for use by all processes
2055
2056 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2057
Jesse Noller146b7ab2008-07-02 16:44:09 +00002058 ProcessesMixin.pool = multiprocessing.Pool(4)
2059 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2060 ManagerMixin.manager.__init__()
2061 ManagerMixin.manager.start()
2062 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002063
2064 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002065 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2066 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002067 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2068 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002069 )
2070
2071 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2072 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002073 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2074 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002075 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002076 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002077 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002078 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2079 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2080 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002081 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002082
Jesse Noller146b7ab2008-07-02 16:44:09 +00002083 ThreadsMixin.pool.terminate()
2084 ProcessesMixin.pool.terminate()
2085 ManagerMixin.pool.terminate()
2086 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002087
Jesse Noller146b7ab2008-07-02 16:44:09 +00002088 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002089
2090def main():
2091 test_main(unittest.TextTestRunner(verbosity=2).run)
2092
2093if __name__ == '__main__':
2094 main()