blob: fcaba91d3e8f063e42404dd7c6d1052ca95c622b [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000020from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000021from StringIO import StringIO
Benjamin Petersondfd79492008-06-13 19:13:39 +000022
Jesse Noller37040cd2008-09-30 00:15:45 +000023
R. David Murray3db8a342009-03-30 23:05:48 +000024_multiprocessing = test_support.import_module('_multiprocessing')
25
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Petersone79edf52008-07-13 18:34:58 +000041latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000048#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000049
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersondfd79492008-06-13 19:13:39 +000065#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000066# Some tests require ctypes
67#
68
69try:
70 from ctypes import Structure, Value, copy, c_int, c_double
71except ImportError:
72 Structure = object
73 c_int = c_double = None
74
75#
Benjamin Petersondfd79492008-06-13 19:13:39 +000076# Creates a wrapper for a function which records the time it takes to finish
77#
78
79class TimingWrapper(object):
80
81 def __init__(self, func):
82 self.func = func
83 self.elapsed = None
84
85 def __call__(self, *args, **kwds):
86 t = time.time()
87 try:
88 return self.func(*args, **kwds)
89 finally:
90 self.elapsed = time.time() - t
91
92#
93# Base class for test cases
94#
95
96class BaseTestCase(object):
97
98 ALLOWED_TYPES = ('processes', 'manager', 'threads')
99
100 def assertTimingAlmostEqual(self, a, b):
101 if CHECK_TIMINGS:
102 self.assertAlmostEqual(a, b, 1)
103
104 def assertReturnsIfImplemented(self, value, func, *args):
105 try:
106 res = func(*args)
107 except NotImplementedError:
108 pass
109 else:
110 return self.assertEqual(value, res)
111
112#
113# Return the value of a semaphore
114#
115
116def get_value(self):
117 try:
118 return self.get_value()
119 except AttributeError:
120 try:
121 return self._Semaphore__value
122 except AttributeError:
123 try:
124 return self._value
125 except AttributeError:
126 raise NotImplementedError
127
128#
129# Testcases
130#
131
132class _TestProcess(BaseTestCase):
133
134 ALLOWED_TYPES = ('processes', 'threads')
135
136 def test_current(self):
137 if self.TYPE == 'threads':
138 return
139
140 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000141 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000142
143 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000144 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000145 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000146 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000147 self.assertEqual(current.ident, os.getpid())
148 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000149
150 def _test(self, q, *args, **kwds):
151 current = self.current_process()
152 q.put(args)
153 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000155 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000156 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000157 q.put(current.pid)
158
159 def test_process(self):
160 q = self.Queue(1)
161 e = self.Event()
162 args = (q, 1, 2)
163 kwargs = {'hello':23, 'bye':2.54}
164 name = 'SomeProcess'
165 p = self.Process(
166 target=self._test, args=args, kwargs=kwargs, name=name
167 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000169 current = self.current_process()
170
171 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000172 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000173 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000174 self.assertEquals(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000175 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000176 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000177 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000178
179 p.start()
180
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000181 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000182 self.assertEquals(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000183 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184
185 self.assertEquals(q.get(), args[1:])
186 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000189 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000190 self.assertEquals(q.get(), p.pid)
191
192 p.join()
193
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000194 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195 self.assertEquals(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000196 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000197
198 def _test_terminate(self):
199 time.sleep(1000)
200
201 def test_terminate(self):
202 if self.TYPE == 'threads':
203 return
204
205 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000206 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000207 p.start()
208
209 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000211 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000212
213 p.terminate()
214
215 join = TimingWrapper(p.join)
216 self.assertEqual(join(), None)
217 self.assertTimingAlmostEqual(join.elapsed, 0.0)
218
219 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000220 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000221
222 p.join()
223
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 # XXX sometimes get p.exitcode == 0 on Windows ...
225 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000226
227 def test_cpu_count(self):
228 try:
229 cpus = multiprocessing.cpu_count()
230 except NotImplementedError:
231 cpus = 1
232 self.assertTrue(type(cpus) is int)
233 self.assertTrue(cpus >= 1)
234
235 def test_active_children(self):
236 self.assertEqual(type(self.active_children()), list)
237
238 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000239 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000240
241 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000242 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000243
244 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000245 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000246
247 def _test_recursion(self, wconn, id):
248 from multiprocessing import forking
249 wconn.send(id)
250 if len(id) < 2:
251 for i in range(2):
252 p = self.Process(
253 target=self._test_recursion, args=(wconn, id+[i])
254 )
255 p.start()
256 p.join()
257
258 def test_recursion(self):
259 rconn, wconn = self.Pipe(duplex=False)
260 self._test_recursion(wconn, [])
261
262 time.sleep(DELTA)
263 result = []
264 while rconn.poll():
265 result.append(rconn.recv())
266
267 expected = [
268 [],
269 [0],
270 [0, 0],
271 [0, 1],
272 [1],
273 [1, 0],
274 [1, 1]
275 ]
276 self.assertEqual(result, expected)
277
278#
279#
280#
281
282class _UpperCaser(multiprocessing.Process):
283
284 def __init__(self):
285 multiprocessing.Process.__init__(self)
286 self.child_conn, self.parent_conn = multiprocessing.Pipe()
287
288 def run(self):
289 self.parent_conn.close()
290 for s in iter(self.child_conn.recv, None):
291 self.child_conn.send(s.upper())
292 self.child_conn.close()
293
294 def submit(self, s):
295 assert type(s) is str
296 self.parent_conn.send(s)
297 return self.parent_conn.recv()
298
299 def stop(self):
300 self.parent_conn.send(None)
301 self.parent_conn.close()
302 self.child_conn.close()
303
304class _TestSubclassingProcess(BaseTestCase):
305
306 ALLOWED_TYPES = ('processes',)
307
308 def test_subclassing(self):
309 uppercaser = _UpperCaser()
310 uppercaser.start()
311 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
312 self.assertEqual(uppercaser.submit('world'), 'WORLD')
313 uppercaser.stop()
314 uppercaser.join()
315
316#
317#
318#
319
320def queue_empty(q):
321 if hasattr(q, 'empty'):
322 return q.empty()
323 else:
324 return q.qsize() == 0
325
326def queue_full(q, maxsize):
327 if hasattr(q, 'full'):
328 return q.full()
329 else:
330 return q.qsize() == maxsize
331
332
333class _TestQueue(BaseTestCase):
334
335
336 def _test_put(self, queue, child_can_start, parent_can_continue):
337 child_can_start.wait()
338 for i in range(6):
339 queue.get()
340 parent_can_continue.set()
341
342 def test_put(self):
343 MAXSIZE = 6
344 queue = self.Queue(maxsize=MAXSIZE)
345 child_can_start = self.Event()
346 parent_can_continue = self.Event()
347
348 proc = self.Process(
349 target=self._test_put,
350 args=(queue, child_can_start, parent_can_continue)
351 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000352 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000353 proc.start()
354
355 self.assertEqual(queue_empty(queue), True)
356 self.assertEqual(queue_full(queue, MAXSIZE), False)
357
358 queue.put(1)
359 queue.put(2, True)
360 queue.put(3, True, None)
361 queue.put(4, False)
362 queue.put(5, False, None)
363 queue.put_nowait(6)
364
365 # the values may be in buffer but not yet in pipe so sleep a bit
366 time.sleep(DELTA)
367
368 self.assertEqual(queue_empty(queue), False)
369 self.assertEqual(queue_full(queue, MAXSIZE), True)
370
371 put = TimingWrapper(queue.put)
372 put_nowait = TimingWrapper(queue.put_nowait)
373
374 self.assertRaises(Queue.Full, put, 7, False)
375 self.assertTimingAlmostEqual(put.elapsed, 0)
376
377 self.assertRaises(Queue.Full, put, 7, False, None)
378 self.assertTimingAlmostEqual(put.elapsed, 0)
379
380 self.assertRaises(Queue.Full, put_nowait, 7)
381 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
382
383 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
384 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
385
386 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
387 self.assertTimingAlmostEqual(put.elapsed, 0)
388
389 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
390 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
391
392 child_can_start.set()
393 parent_can_continue.wait()
394
395 self.assertEqual(queue_empty(queue), True)
396 self.assertEqual(queue_full(queue, MAXSIZE), False)
397
398 proc.join()
399
400 def _test_get(self, queue, child_can_start, parent_can_continue):
401 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000402 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000403 queue.put(2)
404 queue.put(3)
405 queue.put(4)
406 queue.put(5)
407 parent_can_continue.set()
408
409 def test_get(self):
410 queue = self.Queue()
411 child_can_start = self.Event()
412 parent_can_continue = self.Event()
413
414 proc = self.Process(
415 target=self._test_get,
416 args=(queue, child_can_start, parent_can_continue)
417 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000418 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 proc.start()
420
421 self.assertEqual(queue_empty(queue), True)
422
423 child_can_start.set()
424 parent_can_continue.wait()
425
426 time.sleep(DELTA)
427 self.assertEqual(queue_empty(queue), False)
428
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000429 # Hangs unexpectedly, remove for now
430 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000431 self.assertEqual(queue.get(True, None), 2)
432 self.assertEqual(queue.get(True), 3)
433 self.assertEqual(queue.get(timeout=1), 4)
434 self.assertEqual(queue.get_nowait(), 5)
435
436 self.assertEqual(queue_empty(queue), True)
437
438 get = TimingWrapper(queue.get)
439 get_nowait = TimingWrapper(queue.get_nowait)
440
441 self.assertRaises(Queue.Empty, get, False)
442 self.assertTimingAlmostEqual(get.elapsed, 0)
443
444 self.assertRaises(Queue.Empty, get, False, None)
445 self.assertTimingAlmostEqual(get.elapsed, 0)
446
447 self.assertRaises(Queue.Empty, get_nowait)
448 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
449
450 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
451 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
452
453 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
454 self.assertTimingAlmostEqual(get.elapsed, 0)
455
456 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
457 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
458
459 proc.join()
460
461 def _test_fork(self, queue):
462 for i in range(10, 20):
463 queue.put(i)
464 # note that at this point the items may only be buffered, so the
465 # process cannot shutdown until the feeder thread has finished
466 # pushing items onto the pipe.
467
468 def test_fork(self):
469 # Old versions of Queue would fail to create a new feeder
470 # thread for a forked process if the original process had its
471 # own feeder thread. This test checks that this no longer
472 # happens.
473
474 queue = self.Queue()
475
476 # put items on queue so that main process starts a feeder thread
477 for i in range(10):
478 queue.put(i)
479
480 # wait to make sure thread starts before we fork a new process
481 time.sleep(DELTA)
482
483 # fork process
484 p = self.Process(target=self._test_fork, args=(queue,))
485 p.start()
486
487 # check that all expected items are in the queue
488 for i in range(20):
489 self.assertEqual(queue.get(), i)
490 self.assertRaises(Queue.Empty, queue.get, False)
491
492 p.join()
493
494 def test_qsize(self):
495 q = self.Queue()
496 try:
497 self.assertEqual(q.qsize(), 0)
498 except NotImplementedError:
499 return
500 q.put(1)
501 self.assertEqual(q.qsize(), 1)
502 q.put(5)
503 self.assertEqual(q.qsize(), 2)
504 q.get()
505 self.assertEqual(q.qsize(), 1)
506 q.get()
507 self.assertEqual(q.qsize(), 0)
508
509 def _test_task_done(self, q):
510 for obj in iter(q.get, None):
511 time.sleep(DELTA)
512 q.task_done()
513
514 def test_task_done(self):
515 queue = self.JoinableQueue()
516
517 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000518 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000519
520 workers = [self.Process(target=self._test_task_done, args=(queue,))
521 for i in xrange(4)]
522
523 for p in workers:
524 p.start()
525
526 for i in xrange(10):
527 queue.put(i)
528
529 queue.join()
530
531 for p in workers:
532 queue.put(None)
533
534 for p in workers:
535 p.join()
536
537#
538#
539#
540
541class _TestLock(BaseTestCase):
542
543 def test_lock(self):
544 lock = self.Lock()
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.acquire(False), False)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((ValueError, threading.ThreadError), lock.release)
549
550 def test_rlock(self):
551 lock = self.RLock()
552 self.assertEqual(lock.acquire(), True)
553 self.assertEqual(lock.acquire(), True)
554 self.assertEqual(lock.acquire(), True)
555 self.assertEqual(lock.release(), None)
556 self.assertEqual(lock.release(), None)
557 self.assertEqual(lock.release(), None)
558 self.assertRaises((AssertionError, RuntimeError), lock.release)
559
Jesse Noller82eb5902009-03-30 23:29:31 +0000560 def test_lock_context(self):
561 with self.Lock():
562 pass
563
Benjamin Petersondfd79492008-06-13 19:13:39 +0000564
565class _TestSemaphore(BaseTestCase):
566
567 def _test_semaphore(self, sem):
568 self.assertReturnsIfImplemented(2, get_value, sem)
569 self.assertEqual(sem.acquire(), True)
570 self.assertReturnsIfImplemented(1, get_value, sem)
571 self.assertEqual(sem.acquire(), True)
572 self.assertReturnsIfImplemented(0, get_value, sem)
573 self.assertEqual(sem.acquire(False), False)
574 self.assertReturnsIfImplemented(0, get_value, sem)
575 self.assertEqual(sem.release(), None)
576 self.assertReturnsIfImplemented(1, get_value, sem)
577 self.assertEqual(sem.release(), None)
578 self.assertReturnsIfImplemented(2, get_value, sem)
579
580 def test_semaphore(self):
581 sem = self.Semaphore(2)
582 self._test_semaphore(sem)
583 self.assertEqual(sem.release(), None)
584 self.assertReturnsIfImplemented(3, get_value, sem)
585 self.assertEqual(sem.release(), None)
586 self.assertReturnsIfImplemented(4, get_value, sem)
587
588 def test_bounded_semaphore(self):
589 sem = self.BoundedSemaphore(2)
590 self._test_semaphore(sem)
591 # Currently fails on OS/X
592 #if HAVE_GETVALUE:
593 # self.assertRaises(ValueError, sem.release)
594 # self.assertReturnsIfImplemented(2, get_value, sem)
595
596 def test_timeout(self):
597 if self.TYPE != 'processes':
598 return
599
600 sem = self.Semaphore(0)
601 acquire = TimingWrapper(sem.acquire)
602
603 self.assertEqual(acquire(False), False)
604 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
605
606 self.assertEqual(acquire(False, None), False)
607 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
608
609 self.assertEqual(acquire(False, TIMEOUT1), False)
610 self.assertTimingAlmostEqual(acquire.elapsed, 0)
611
612 self.assertEqual(acquire(True, TIMEOUT2), False)
613 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
614
615 self.assertEqual(acquire(timeout=TIMEOUT3), False)
616 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
617
618
619class _TestCondition(BaseTestCase):
620
621 def f(self, cond, sleeping, woken, timeout=None):
622 cond.acquire()
623 sleeping.release()
624 cond.wait(timeout)
625 woken.release()
626 cond.release()
627
628 def check_invariant(self, cond):
629 # this is only supposed to succeed when there are no sleepers
630 if self.TYPE == 'processes':
631 try:
632 sleepers = (cond._sleeping_count.get_value() -
633 cond._woken_count.get_value())
634 self.assertEqual(sleepers, 0)
635 self.assertEqual(cond._wait_semaphore.get_value(), 0)
636 except NotImplementedError:
637 pass
638
639 def test_notify(self):
640 cond = self.Condition()
641 sleeping = self.Semaphore(0)
642 woken = self.Semaphore(0)
643
644 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000645 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000646 p.start()
647
648 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000649 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000650 p.start()
651
652 # wait for both children to start sleeping
653 sleeping.acquire()
654 sleeping.acquire()
655
656 # check no process/thread has woken up
657 time.sleep(DELTA)
658 self.assertReturnsIfImplemented(0, get_value, woken)
659
660 # wake up one process/thread
661 cond.acquire()
662 cond.notify()
663 cond.release()
664
665 # check one process/thread has woken up
666 time.sleep(DELTA)
667 self.assertReturnsIfImplemented(1, get_value, woken)
668
669 # wake up another
670 cond.acquire()
671 cond.notify()
672 cond.release()
673
674 # check other has woken up
675 time.sleep(DELTA)
676 self.assertReturnsIfImplemented(2, get_value, woken)
677
678 # check state is not mucked up
679 self.check_invariant(cond)
680 p.join()
681
682 def test_notify_all(self):
683 cond = self.Condition()
684 sleeping = self.Semaphore(0)
685 woken = self.Semaphore(0)
686
687 # start some threads/processes which will timeout
688 for i in range(3):
689 p = self.Process(target=self.f,
690 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000691 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000692 p.start()
693
694 t = threading.Thread(target=self.f,
695 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000696 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000697 t.start()
698
699 # wait for them all to sleep
700 for i in xrange(6):
701 sleeping.acquire()
702
703 # check they have all timed out
704 for i in xrange(6):
705 woken.acquire()
706 self.assertReturnsIfImplemented(0, get_value, woken)
707
708 # check state is not mucked up
709 self.check_invariant(cond)
710
711 # start some more threads/processes
712 for i in range(3):
713 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000714 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000715 p.start()
716
717 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000718 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000719 t.start()
720
721 # wait for them to all sleep
722 for i in xrange(6):
723 sleeping.acquire()
724
725 # check no process/thread has woken up
726 time.sleep(DELTA)
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # wake them all up
730 cond.acquire()
731 cond.notify_all()
732 cond.release()
733
734 # check they have all woken
735 time.sleep(DELTA)
736 self.assertReturnsIfImplemented(6, get_value, woken)
737
738 # check state is not mucked up
739 self.check_invariant(cond)
740
741 def test_timeout(self):
742 cond = self.Condition()
743 wait = TimingWrapper(cond.wait)
744 cond.acquire()
745 res = wait(TIMEOUT1)
746 cond.release()
747 self.assertEqual(res, None)
748 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
749
750
751class _TestEvent(BaseTestCase):
752
753 def _test_event(self, event):
754 time.sleep(TIMEOUT2)
755 event.set()
756
757 def test_event(self):
758 event = self.Event()
759 wait = TimingWrapper(event.wait)
760
761 # Removed temporaily, due to API shear, this does not
762 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000763 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000764
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000765 # Removed, threading.Event.wait() will return the value of the __flag
766 # instead of None. API Shear with the semaphore backed mp.Event
767 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000768 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000769 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000770 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
771
772 event.set()
773
774 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000775 self.assertEqual(event.is_set(), True)
776 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000777 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000778 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000779 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
780 # self.assertEqual(event.is_set(), True)
781
782 event.clear()
783
784 #self.assertEqual(event.is_set(), False)
785
786 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000787 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000788
789#
790#
791#
792
793class _TestValue(BaseTestCase):
794
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000795 ALLOWED_TYPES = ('processes',)
796
Benjamin Petersondfd79492008-06-13 19:13:39 +0000797 codes_values = [
798 ('i', 4343, 24234),
799 ('d', 3.625, -4.25),
800 ('h', -232, 234),
801 ('c', latin('x'), latin('y'))
802 ]
803
804 def _test(self, values):
805 for sv, cv in zip(values, self.codes_values):
806 sv.value = cv[2]
807
808
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000809 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000810 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000811 if raw:
812 values = [self.RawValue(code, value)
813 for code, value, _ in self.codes_values]
814 else:
815 values = [self.Value(code, value)
816 for code, value, _ in self.codes_values]
817
818 for sv, cv in zip(values, self.codes_values):
819 self.assertEqual(sv.value, cv[1])
820
821 proc = self.Process(target=self._test, args=(values,))
822 proc.start()
823 proc.join()
824
825 for sv, cv in zip(values, self.codes_values):
826 self.assertEqual(sv.value, cv[2])
827
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000828 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000829 def test_rawvalue(self):
830 self.test_value(raw=True)
831
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000832 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000834 val1 = self.Value('i', 5)
835 lock1 = val1.get_lock()
836 obj1 = val1.get_obj()
837
838 val2 = self.Value('i', 5, lock=None)
839 lock2 = val2.get_lock()
840 obj2 = val2.get_obj()
841
842 lock = self.Lock()
843 val3 = self.Value('i', 5, lock=lock)
844 lock3 = val3.get_lock()
845 obj3 = val3.get_obj()
846 self.assertEqual(lock, lock3)
847
Jesse Noller6ab22152009-01-18 02:45:38 +0000848 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849 self.assertFalse(hasattr(arr4, 'get_lock'))
850 self.assertFalse(hasattr(arr4, 'get_obj'))
851
Jesse Noller6ab22152009-01-18 02:45:38 +0000852 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
853
854 arr5 = self.RawValue('i', 5)
855 self.assertFalse(hasattr(arr5, 'get_lock'))
856 self.assertFalse(hasattr(arr5, 'get_obj'))
857
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858
859class _TestArray(BaseTestCase):
860
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000861 ALLOWED_TYPES = ('processes',)
862
Benjamin Petersondfd79492008-06-13 19:13:39 +0000863 def f(self, seq):
864 for i in range(1, len(seq)):
865 seq[i] += seq[i-1]
866
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000867 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000868 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000869 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
870 if raw:
871 arr = self.RawArray('i', seq)
872 else:
873 arr = self.Array('i', seq)
874
875 self.assertEqual(len(arr), len(seq))
876 self.assertEqual(arr[3], seq[3])
877 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
878
879 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
880
881 self.assertEqual(list(arr[:]), seq)
882
883 self.f(seq)
884
885 p = self.Process(target=self.f, args=(arr,))
886 p.start()
887 p.join()
888
889 self.assertEqual(list(arr[:]), seq)
890
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000891 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000892 def test_rawarray(self):
893 self.test_array(raw=True)
894
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000895 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000896 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000897 arr1 = self.Array('i', range(10))
898 lock1 = arr1.get_lock()
899 obj1 = arr1.get_obj()
900
901 arr2 = self.Array('i', range(10), lock=None)
902 lock2 = arr2.get_lock()
903 obj2 = arr2.get_obj()
904
905 lock = self.Lock()
906 arr3 = self.Array('i', range(10), lock=lock)
907 lock3 = arr3.get_lock()
908 obj3 = arr3.get_obj()
909 self.assertEqual(lock, lock3)
910
Jesse Noller6ab22152009-01-18 02:45:38 +0000911 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912 self.assertFalse(hasattr(arr4, 'get_lock'))
913 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000914 self.assertRaises(AttributeError,
915 self.Array, 'i', range(10), lock='notalock')
916
917 arr5 = self.RawArray('i', range(10))
918 self.assertFalse(hasattr(arr5, 'get_lock'))
919 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000920
921#
922#
923#
924
925class _TestContainers(BaseTestCase):
926
927 ALLOWED_TYPES = ('manager',)
928
929 def test_list(self):
930 a = self.list(range(10))
931 self.assertEqual(a[:], range(10))
932
933 b = self.list()
934 self.assertEqual(b[:], [])
935
936 b.extend(range(5))
937 self.assertEqual(b[:], range(5))
938
939 self.assertEqual(b[2], 2)
940 self.assertEqual(b[2:10], [2,3,4])
941
942 b *= 2
943 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
944
945 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
946
947 self.assertEqual(a[:], range(10))
948
949 d = [a, b]
950 e = self.list(d)
951 self.assertEqual(
952 e[:],
953 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
954 )
955
956 f = self.list([a])
957 a.append('hello')
958 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
959
960 def test_dict(self):
961 d = self.dict()
962 indices = range(65, 70)
963 for i in indices:
964 d[i] = chr(i)
965 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
966 self.assertEqual(sorted(d.keys()), indices)
967 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
968 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
969
970 def test_namespace(self):
971 n = self.Namespace()
972 n.name = 'Bob'
973 n.job = 'Builder'
974 n._hidden = 'hidden'
975 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
976 del n.job
977 self.assertEqual(str(n), "Namespace(name='Bob')")
978 self.assertTrue(hasattr(n, 'name'))
979 self.assertTrue(not hasattr(n, 'job'))
980
981#
982#
983#
984
985def sqr(x, wait=0.0):
986 time.sleep(wait)
987 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000988class _TestPool(BaseTestCase):
989
990 def test_apply(self):
991 papply = self.pool.apply
992 self.assertEqual(papply(sqr, (5,)), sqr(5))
993 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
994
995 def test_map(self):
996 pmap = self.pool.map
997 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
998 self.assertEqual(pmap(sqr, range(100), chunksize=20),
999 map(sqr, range(100)))
1000
Jesse Noller7530e472009-07-16 14:23:04 +00001001 def test_map_chunksize(self):
1002 try:
1003 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1004 except multiprocessing.TimeoutError:
1005 self.fail("pool.map_async with chunksize stalled on null list")
1006
Benjamin Petersondfd79492008-06-13 19:13:39 +00001007 def test_async(self):
1008 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1009 get = TimingWrapper(res.get)
1010 self.assertEqual(get(), 49)
1011 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1012
1013 def test_async_timeout(self):
1014 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1015 get = TimingWrapper(res.get)
1016 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1017 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1018
1019 def test_imap(self):
1020 it = self.pool.imap(sqr, range(10))
1021 self.assertEqual(list(it), map(sqr, range(10)))
1022
1023 it = self.pool.imap(sqr, range(10))
1024 for i in range(10):
1025 self.assertEqual(it.next(), i*i)
1026 self.assertRaises(StopIteration, it.next)
1027
1028 it = self.pool.imap(sqr, range(1000), chunksize=100)
1029 for i in range(1000):
1030 self.assertEqual(it.next(), i*i)
1031 self.assertRaises(StopIteration, it.next)
1032
1033 def test_imap_unordered(self):
1034 it = self.pool.imap_unordered(sqr, range(1000))
1035 self.assertEqual(sorted(it), map(sqr, range(1000)))
1036
1037 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1038 self.assertEqual(sorted(it), map(sqr, range(1000)))
1039
1040 def test_make_pool(self):
1041 p = multiprocessing.Pool(3)
1042 self.assertEqual(3, len(p._pool))
1043 p.close()
1044 p.join()
1045
1046 def test_terminate(self):
1047 if self.TYPE == 'manager':
1048 # On Unix a forked process increfs each shared object to
1049 # which its parent process held a reference. If the
1050 # forked process gets terminated then there is likely to
1051 # be a reference leak. So to prevent
1052 # _TestZZZNumberOfObjects from failing we skip this test
1053 # when using a manager.
1054 return
1055
1056 result = self.pool.map_async(
1057 time.sleep, [0.1 for i in range(10000)], chunksize=1
1058 )
1059 self.pool.terminate()
1060 join = TimingWrapper(self.pool.join)
1061 join()
1062 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001063
1064class _TestPoolWorkerLifetime(BaseTestCase):
1065
1066 ALLOWED_TYPES = ('processes', )
1067 def test_pool_worker_lifetime(self):
1068 p = multiprocessing.Pool(3, maxtasksperchild=10)
1069 self.assertEqual(3, len(p._pool))
1070 origworkerpids = [w.pid for w in p._pool]
1071 # Run many tasks so each worker gets replaced (hopefully)
1072 results = []
1073 for i in range(100):
1074 results.append(p.apply_async(sqr, (i, )))
1075 # Fetch the results and verify we got the right answers,
1076 # also ensuring all the tasks have completed.
1077 for (j, res) in enumerate(results):
1078 self.assertEqual(res.get(), sqr(j))
1079 # Refill the pool
1080 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001081 # Wait until all workers are alive
1082 countdown = 5
1083 while countdown and not all(w.is_alive() for w in p._pool):
1084 countdown -= 1
1085 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001086 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001087 # All pids should be assigned. See issue #7805.
1088 self.assertNotIn(None, origworkerpids)
1089 self.assertNotIn(None, finalworkerpids)
1090 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001091 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1092 p.close()
1093 p.join()
1094
Benjamin Petersondfd79492008-06-13 19:13:39 +00001095#
1096# Test that manager has expected number of shared objects left
1097#
1098
1099class _TestZZZNumberOfObjects(BaseTestCase):
1100 # Because test cases are sorted alphabetically, this one will get
1101 # run after all the other tests for the manager. It tests that
1102 # there have been no "reference leaks" for the manager's shared
1103 # objects. Note the comment in _TestPool.test_terminate().
1104 ALLOWED_TYPES = ('manager',)
1105
1106 def test_number_of_objects(self):
1107 EXPECTED_NUMBER = 1 # the pool object is still alive
1108 multiprocessing.active_children() # discard dead process objs
1109 gc.collect() # do garbage collection
1110 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001111 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001112 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001113 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001114 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001115
1116 self.assertEqual(refs, EXPECTED_NUMBER)
1117
1118#
1119# Test of creating a customized manager class
1120#
1121
1122from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1123
1124class FooBar(object):
1125 def f(self):
1126 return 'f()'
1127 def g(self):
1128 raise ValueError
1129 def _h(self):
1130 return '_h()'
1131
1132def baz():
1133 for i in xrange(10):
1134 yield i*i
1135
1136class IteratorProxy(BaseProxy):
1137 _exposed_ = ('next', '__next__')
1138 def __iter__(self):
1139 return self
1140 def next(self):
1141 return self._callmethod('next')
1142 def __next__(self):
1143 return self._callmethod('__next__')
1144
1145class MyManager(BaseManager):
1146 pass
1147
1148MyManager.register('Foo', callable=FooBar)
1149MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1150MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1151
1152
1153class _TestMyManager(BaseTestCase):
1154
1155 ALLOWED_TYPES = ('manager',)
1156
1157 def test_mymanager(self):
1158 manager = MyManager()
1159 manager.start()
1160
1161 foo = manager.Foo()
1162 bar = manager.Bar()
1163 baz = manager.baz()
1164
1165 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1166 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1167
1168 self.assertEqual(foo_methods, ['f', 'g'])
1169 self.assertEqual(bar_methods, ['f', '_h'])
1170
1171 self.assertEqual(foo.f(), 'f()')
1172 self.assertRaises(ValueError, foo.g)
1173 self.assertEqual(foo._callmethod('f'), 'f()')
1174 self.assertRaises(RemoteError, foo._callmethod, '_h')
1175
1176 self.assertEqual(bar.f(), 'f()')
1177 self.assertEqual(bar._h(), '_h()')
1178 self.assertEqual(bar._callmethod('f'), 'f()')
1179 self.assertEqual(bar._callmethod('_h'), '_h()')
1180
1181 self.assertEqual(list(baz), [i*i for i in range(10)])
1182
1183 manager.shutdown()
1184
1185#
1186# Test of connecting to a remote server and using xmlrpclib for serialization
1187#
1188
1189_queue = Queue.Queue()
1190def get_queue():
1191 return _queue
1192
1193class QueueManager(BaseManager):
1194 '''manager class used by server process'''
1195QueueManager.register('get_queue', callable=get_queue)
1196
1197class QueueManager2(BaseManager):
1198 '''manager class which specifies the same interface as QueueManager'''
1199QueueManager2.register('get_queue')
1200
1201
1202SERIALIZER = 'xmlrpclib'
1203
1204class _TestRemoteManager(BaseTestCase):
1205
1206 ALLOWED_TYPES = ('manager',)
1207
1208 def _putter(self, address, authkey):
1209 manager = QueueManager2(
1210 address=address, authkey=authkey, serializer=SERIALIZER
1211 )
1212 manager.connect()
1213 queue = manager.get_queue()
1214 queue.put(('hello world', None, True, 2.25))
1215
1216 def test_remote(self):
1217 authkey = os.urandom(32)
1218
1219 manager = QueueManager(
1220 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1221 )
1222 manager.start()
1223
1224 p = self.Process(target=self._putter, args=(manager.address, authkey))
1225 p.start()
1226
1227 manager2 = QueueManager2(
1228 address=manager.address, authkey=authkey, serializer=SERIALIZER
1229 )
1230 manager2.connect()
1231 queue = manager2.get_queue()
1232
1233 # Note that xmlrpclib will deserialize object as a list not a tuple
1234 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1235
1236 # Because we are using xmlrpclib for serialization instead of
1237 # pickle this will cause a serialization error.
1238 self.assertRaises(Exception, queue.put, time.sleep)
1239
1240 # Make queue finalizer run before the server is stopped
1241 del queue
1242 manager.shutdown()
1243
Jesse Noller459a6482009-03-30 15:50:42 +00001244class _TestManagerRestart(BaseTestCase):
1245
1246 def _putter(self, address, authkey):
1247 manager = QueueManager(
1248 address=address, authkey=authkey, serializer=SERIALIZER)
1249 manager.connect()
1250 queue = manager.get_queue()
1251 queue.put('hello world')
1252
1253 def test_rapid_restart(self):
1254 authkey = os.urandom(32)
R. David Murrayc7298ff2009-12-14 21:57:39 +00001255 port = test_support.find_unused_port()
Jesse Noller459a6482009-03-30 15:50:42 +00001256 manager = QueueManager(
R. David Murrayc7298ff2009-12-14 21:57:39 +00001257 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001258 manager.start()
1259
1260 p = self.Process(target=self._putter, args=(manager.address, authkey))
1261 p.start()
1262 queue = manager.get_queue()
1263 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001264 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001265 manager.shutdown()
1266 manager = QueueManager(
R. David Murrayc7298ff2009-12-14 21:57:39 +00001267 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001268 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001269 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001270
Benjamin Petersondfd79492008-06-13 19:13:39 +00001271#
1272#
1273#
1274
1275SENTINEL = latin('')
1276
1277class _TestConnection(BaseTestCase):
1278
1279 ALLOWED_TYPES = ('processes', 'threads')
1280
1281 def _echo(self, conn):
1282 for msg in iter(conn.recv_bytes, SENTINEL):
1283 conn.send_bytes(msg)
1284 conn.close()
1285
1286 def test_connection(self):
1287 conn, child_conn = self.Pipe()
1288
1289 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001290 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001291 p.start()
1292
1293 seq = [1, 2.25, None]
1294 msg = latin('hello world')
1295 longmsg = msg * 10
1296 arr = array.array('i', range(4))
1297
1298 if self.TYPE == 'processes':
1299 self.assertEqual(type(conn.fileno()), int)
1300
1301 self.assertEqual(conn.send(seq), None)
1302 self.assertEqual(conn.recv(), seq)
1303
1304 self.assertEqual(conn.send_bytes(msg), None)
1305 self.assertEqual(conn.recv_bytes(), msg)
1306
1307 if self.TYPE == 'processes':
1308 buffer = array.array('i', [0]*10)
1309 expected = list(arr) + [0] * (10 - len(arr))
1310 self.assertEqual(conn.send_bytes(arr), None)
1311 self.assertEqual(conn.recv_bytes_into(buffer),
1312 len(arr) * buffer.itemsize)
1313 self.assertEqual(list(buffer), expected)
1314
1315 buffer = array.array('i', [0]*10)
1316 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1317 self.assertEqual(conn.send_bytes(arr), None)
1318 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1319 len(arr) * buffer.itemsize)
1320 self.assertEqual(list(buffer), expected)
1321
1322 buffer = bytearray(latin(' ' * 40))
1323 self.assertEqual(conn.send_bytes(longmsg), None)
1324 try:
1325 res = conn.recv_bytes_into(buffer)
1326 except multiprocessing.BufferTooShort, e:
1327 self.assertEqual(e.args, (longmsg,))
1328 else:
1329 self.fail('expected BufferTooShort, got %s' % res)
1330
1331 poll = TimingWrapper(conn.poll)
1332
1333 self.assertEqual(poll(), False)
1334 self.assertTimingAlmostEqual(poll.elapsed, 0)
1335
1336 self.assertEqual(poll(TIMEOUT1), False)
1337 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1338
1339 conn.send(None)
1340
1341 self.assertEqual(poll(TIMEOUT1), True)
1342 self.assertTimingAlmostEqual(poll.elapsed, 0)
1343
1344 self.assertEqual(conn.recv(), None)
1345
1346 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1347 conn.send_bytes(really_big_msg)
1348 self.assertEqual(conn.recv_bytes(), really_big_msg)
1349
1350 conn.send_bytes(SENTINEL) # tell child to quit
1351 child_conn.close()
1352
1353 if self.TYPE == 'processes':
1354 self.assertEqual(conn.readable, True)
1355 self.assertEqual(conn.writable, True)
1356 self.assertRaises(EOFError, conn.recv)
1357 self.assertRaises(EOFError, conn.recv_bytes)
1358
1359 p.join()
1360
1361 def test_duplex_false(self):
1362 reader, writer = self.Pipe(duplex=False)
1363 self.assertEqual(writer.send(1), None)
1364 self.assertEqual(reader.recv(), 1)
1365 if self.TYPE == 'processes':
1366 self.assertEqual(reader.readable, True)
1367 self.assertEqual(reader.writable, False)
1368 self.assertEqual(writer.readable, False)
1369 self.assertEqual(writer.writable, True)
1370 self.assertRaises(IOError, reader.send, 2)
1371 self.assertRaises(IOError, writer.recv)
1372 self.assertRaises(IOError, writer.poll)
1373
1374 def test_spawn_close(self):
1375 # We test that a pipe connection can be closed by parent
1376 # process immediately after child is spawned. On Windows this
1377 # would have sometimes failed on old versions because
1378 # child_conn would be closed before the child got a chance to
1379 # duplicate it.
1380 conn, child_conn = self.Pipe()
1381
1382 p = self.Process(target=self._echo, args=(child_conn,))
1383 p.start()
1384 child_conn.close() # this might complete before child initializes
1385
1386 msg = latin('hello')
1387 conn.send_bytes(msg)
1388 self.assertEqual(conn.recv_bytes(), msg)
1389
1390 conn.send_bytes(SENTINEL)
1391 conn.close()
1392 p.join()
1393
1394 def test_sendbytes(self):
1395 if self.TYPE != 'processes':
1396 return
1397
1398 msg = latin('abcdefghijklmnopqrstuvwxyz')
1399 a, b = self.Pipe()
1400
1401 a.send_bytes(msg)
1402 self.assertEqual(b.recv_bytes(), msg)
1403
1404 a.send_bytes(msg, 5)
1405 self.assertEqual(b.recv_bytes(), msg[5:])
1406
1407 a.send_bytes(msg, 7, 8)
1408 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1409
1410 a.send_bytes(msg, 26)
1411 self.assertEqual(b.recv_bytes(), latin(''))
1412
1413 a.send_bytes(msg, 26, 0)
1414 self.assertEqual(b.recv_bytes(), latin(''))
1415
1416 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1417
1418 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1419
1420 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1421
1422 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1423
1424 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1425
Benjamin Petersondfd79492008-06-13 19:13:39 +00001426class _TestListenerClient(BaseTestCase):
1427
1428 ALLOWED_TYPES = ('processes', 'threads')
1429
1430 def _test(self, address):
1431 conn = self.connection.Client(address)
1432 conn.send('hello')
1433 conn.close()
1434
1435 def test_listener_client(self):
1436 for family in self.connection.families:
1437 l = self.connection.Listener(family=family)
1438 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001439 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001440 p.start()
1441 conn = l.accept()
1442 self.assertEqual(conn.recv(), 'hello')
1443 p.join()
1444 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001445#
1446# Test of sending connection and socket objects between processes
1447#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001448"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001449class _TestPicklingConnections(BaseTestCase):
1450
1451 ALLOWED_TYPES = ('processes',)
1452
1453 def _listener(self, conn, families):
1454 for fam in families:
1455 l = self.connection.Listener(family=fam)
1456 conn.send(l.address)
1457 new_conn = l.accept()
1458 conn.send(new_conn)
1459
1460 if self.TYPE == 'processes':
1461 l = socket.socket()
1462 l.bind(('localhost', 0))
1463 conn.send(l.getsockname())
1464 l.listen(1)
1465 new_conn, addr = l.accept()
1466 conn.send(new_conn)
1467
1468 conn.recv()
1469
1470 def _remote(self, conn):
1471 for (address, msg) in iter(conn.recv, None):
1472 client = self.connection.Client(address)
1473 client.send(msg.upper())
1474 client.close()
1475
1476 if self.TYPE == 'processes':
1477 address, msg = conn.recv()
1478 client = socket.socket()
1479 client.connect(address)
1480 client.sendall(msg.upper())
1481 client.close()
1482
1483 conn.close()
1484
1485 def test_pickling(self):
1486 try:
1487 multiprocessing.allow_connection_pickling()
1488 except ImportError:
1489 return
1490
1491 families = self.connection.families
1492
1493 lconn, lconn0 = self.Pipe()
1494 lp = self.Process(target=self._listener, args=(lconn0, families))
1495 lp.start()
1496 lconn0.close()
1497
1498 rconn, rconn0 = self.Pipe()
1499 rp = self.Process(target=self._remote, args=(rconn0,))
1500 rp.start()
1501 rconn0.close()
1502
1503 for fam in families:
1504 msg = ('This connection uses family %s' % fam).encode('ascii')
1505 address = lconn.recv()
1506 rconn.send((address, msg))
1507 new_conn = lconn.recv()
1508 self.assertEqual(new_conn.recv(), msg.upper())
1509
1510 rconn.send(None)
1511
1512 if self.TYPE == 'processes':
1513 msg = latin('This connection uses a normal socket')
1514 address = lconn.recv()
1515 rconn.send((address, msg))
1516 if hasattr(socket, 'fromfd'):
1517 new_conn = lconn.recv()
1518 self.assertEqual(new_conn.recv(100), msg.upper())
1519 else:
1520 # XXX On Windows with Py2.6 need to backport fromfd()
1521 discard = lconn.recv_bytes()
1522
1523 lconn.send(None)
1524
1525 rconn.close()
1526 lconn.close()
1527
1528 lp.join()
1529 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001530"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001531#
1532#
1533#
1534
1535class _TestHeap(BaseTestCase):
1536
1537 ALLOWED_TYPES = ('processes',)
1538
1539 def test_heap(self):
1540 iterations = 5000
1541 maxblocks = 50
1542 blocks = []
1543
1544 # create and destroy lots of blocks of different sizes
1545 for i in xrange(iterations):
1546 size = int(random.lognormvariate(0, 1) * 1000)
1547 b = multiprocessing.heap.BufferWrapper(size)
1548 blocks.append(b)
1549 if len(blocks) > maxblocks:
1550 i = random.randrange(maxblocks)
1551 del blocks[i]
1552
1553 # get the heap object
1554 heap = multiprocessing.heap.BufferWrapper._heap
1555
1556 # verify the state of the heap
1557 all = []
1558 occupied = 0
1559 for L in heap._len_to_seq.values():
1560 for arena, start, stop in L:
1561 all.append((heap._arenas.index(arena), start, stop,
1562 stop-start, 'free'))
1563 for arena, start, stop in heap._allocated_blocks:
1564 all.append((heap._arenas.index(arena), start, stop,
1565 stop-start, 'occupied'))
1566 occupied += (stop-start)
1567
1568 all.sort()
1569
1570 for i in range(len(all)-1):
1571 (arena, start, stop) = all[i][:3]
1572 (narena, nstart, nstop) = all[i+1][:3]
1573 self.assertTrue((arena != narena and nstart == 0) or
1574 (stop == nstart))
1575
1576#
1577#
1578#
1579
Benjamin Petersondfd79492008-06-13 19:13:39 +00001580class _Foo(Structure):
1581 _fields_ = [
1582 ('x', c_int),
1583 ('y', c_double)
1584 ]
1585
1586class _TestSharedCTypes(BaseTestCase):
1587
1588 ALLOWED_TYPES = ('processes',)
1589
1590 def _double(self, x, y, foo, arr, string):
1591 x.value *= 2
1592 y.value *= 2
1593 foo.x *= 2
1594 foo.y *= 2
1595 string.value *= 2
1596 for i in range(len(arr)):
1597 arr[i] *= 2
1598
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001599 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001600 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001601 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001602 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001603 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001604 arr = self.Array('d', range(10), lock=lock)
1605 string = self.Array('c', 20, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001606 string.value = 'hello'
1607
1608 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1609 p.start()
1610 p.join()
1611
1612 self.assertEqual(x.value, 14)
1613 self.assertAlmostEqual(y.value, 2.0/3.0)
1614 self.assertEqual(foo.x, 6)
1615 self.assertAlmostEqual(foo.y, 4.0)
1616 for i in range(10):
1617 self.assertAlmostEqual(arr[i], i*2)
1618 self.assertEqual(string.value, latin('hellohello'))
1619
1620 def test_synchronize(self):
1621 self.test_sharedctypes(lock=True)
1622
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001623 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001624 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001625 foo = _Foo(2, 5.0)
1626 bar = copy(foo)
1627 foo.x = 0
1628 foo.y = 0
1629 self.assertEqual(bar.x, 2)
1630 self.assertAlmostEqual(bar.y, 5.0)
1631
1632#
1633#
1634#
1635
1636class _TestFinalize(BaseTestCase):
1637
1638 ALLOWED_TYPES = ('processes',)
1639
1640 def _test_finalize(self, conn):
1641 class Foo(object):
1642 pass
1643
1644 a = Foo()
1645 util.Finalize(a, conn.send, args=('a',))
1646 del a # triggers callback for a
1647
1648 b = Foo()
1649 close_b = util.Finalize(b, conn.send, args=('b',))
1650 close_b() # triggers callback for b
1651 close_b() # does nothing because callback has already been called
1652 del b # does nothing because callback has already been called
1653
1654 c = Foo()
1655 util.Finalize(c, conn.send, args=('c',))
1656
1657 d10 = Foo()
1658 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1659
1660 d01 = Foo()
1661 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1662 d02 = Foo()
1663 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1664 d03 = Foo()
1665 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1666
1667 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1668
1669 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1670
1671 # call mutliprocessing's cleanup function then exit process without
1672 # garbage collecting locals
1673 util._exit_function()
1674 conn.close()
1675 os._exit(0)
1676
1677 def test_finalize(self):
1678 conn, child_conn = self.Pipe()
1679
1680 p = self.Process(target=self._test_finalize, args=(child_conn,))
1681 p.start()
1682 p.join()
1683
1684 result = [obj for obj in iter(conn.recv, 'STOP')]
1685 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1686
1687#
1688# Test that from ... import * works for each module
1689#
1690
1691class _TestImportStar(BaseTestCase):
1692
1693 ALLOWED_TYPES = ('processes',)
1694
1695 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001696 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001697 'multiprocessing', 'multiprocessing.connection',
1698 'multiprocessing.heap', 'multiprocessing.managers',
1699 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001700 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001701 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001702 ]
1703
1704 if c_int is not None:
1705 # This module requires _ctypes
1706 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001707
1708 for name in modules:
1709 __import__(name)
1710 mod = sys.modules[name]
1711
1712 for attr in getattr(mod, '__all__', ()):
1713 self.assertTrue(
1714 hasattr(mod, attr),
1715 '%r does not have attribute %r' % (mod, attr)
1716 )
1717
1718#
1719# Quick test that logging works -- does not test logging output
1720#
1721
1722class _TestLogging(BaseTestCase):
1723
1724 ALLOWED_TYPES = ('processes',)
1725
1726 def test_enable_logging(self):
1727 logger = multiprocessing.get_logger()
1728 logger.setLevel(util.SUBWARNING)
1729 self.assertTrue(logger is not None)
1730 logger.debug('this will not be printed')
1731 logger.info('nor will this')
1732 logger.setLevel(LOG_LEVEL)
1733
1734 def _test_level(self, conn):
1735 logger = multiprocessing.get_logger()
1736 conn.send(logger.getEffectiveLevel())
1737
1738 def test_level(self):
1739 LEVEL1 = 32
1740 LEVEL2 = 37
1741
1742 logger = multiprocessing.get_logger()
1743 root_logger = logging.getLogger()
1744 root_level = root_logger.level
1745
1746 reader, writer = multiprocessing.Pipe(duplex=False)
1747
1748 logger.setLevel(LEVEL1)
1749 self.Process(target=self._test_level, args=(writer,)).start()
1750 self.assertEqual(LEVEL1, reader.recv())
1751
1752 logger.setLevel(logging.NOTSET)
1753 root_logger.setLevel(LEVEL2)
1754 self.Process(target=self._test_level, args=(writer,)).start()
1755 self.assertEqual(LEVEL2, reader.recv())
1756
1757 root_logger.setLevel(root_level)
1758 logger.setLevel(level=LOG_LEVEL)
1759
Jesse Noller814d02d2009-11-21 14:38:23 +00001760
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001761# class _TestLoggingProcessName(BaseTestCase):
1762#
1763# def handle(self, record):
1764# assert record.processName == multiprocessing.current_process().name
1765# self.__handled = True
1766#
1767# def test_logging(self):
1768# handler = logging.Handler()
1769# handler.handle = self.handle
1770# self.__handled = False
1771# # Bypass getLogger() and side-effects
1772# logger = logging.getLoggerClass()(
1773# 'multiprocessing.test.TestLoggingProcessName')
1774# logger.addHandler(handler)
1775# logger.propagate = False
1776#
1777# logger.warn('foo')
1778# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001779
Benjamin Petersondfd79492008-06-13 19:13:39 +00001780#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001781# Test to verify handle verification, see issue 3321
1782#
1783
1784class TestInvalidHandle(unittest.TestCase):
1785
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001786 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001787 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001788 conn = _multiprocessing.Connection(44977608)
1789 self.assertRaises(IOError, conn.poll)
1790 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001791
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001792#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001793# Functions used to create test cases from the base ones in this module
1794#
1795
1796def get_attributes(Source, names):
1797 d = {}
1798 for name in names:
1799 obj = getattr(Source, name)
1800 if type(obj) == type(get_attributes):
1801 obj = staticmethod(obj)
1802 d[name] = obj
1803 return d
1804
1805def create_test_cases(Mixin, type):
1806 result = {}
1807 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001808 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001809
1810 for name in glob.keys():
1811 if name.startswith('_Test'):
1812 base = glob[name]
1813 if type in base.ALLOWED_TYPES:
1814 newname = 'With' + Type + name[1:]
1815 class Temp(base, unittest.TestCase, Mixin):
1816 pass
1817 result[newname] = Temp
1818 Temp.__name__ = newname
1819 Temp.__module__ = Mixin.__module__
1820 return result
1821
1822#
1823# Create test cases
1824#
1825
1826class ProcessesMixin(object):
1827 TYPE = 'processes'
1828 Process = multiprocessing.Process
1829 locals().update(get_attributes(multiprocessing, (
1830 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1831 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1832 'RawArray', 'current_process', 'active_children', 'Pipe',
1833 'connection', 'JoinableQueue'
1834 )))
1835
1836testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1837globals().update(testcases_processes)
1838
1839
1840class ManagerMixin(object):
1841 TYPE = 'manager'
1842 Process = multiprocessing.Process
1843 manager = object.__new__(multiprocessing.managers.SyncManager)
1844 locals().update(get_attributes(manager, (
1845 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1846 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1847 'Namespace', 'JoinableQueue'
1848 )))
1849
1850testcases_manager = create_test_cases(ManagerMixin, type='manager')
1851globals().update(testcases_manager)
1852
1853
1854class ThreadsMixin(object):
1855 TYPE = 'threads'
1856 Process = multiprocessing.dummy.Process
1857 locals().update(get_attributes(multiprocessing.dummy, (
1858 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1859 'Condition', 'Event', 'Value', 'Array', 'current_process',
1860 'active_children', 'Pipe', 'connection', 'dict', 'list',
1861 'Namespace', 'JoinableQueue'
1862 )))
1863
1864testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1865globals().update(testcases_threads)
1866
Neal Norwitz0c519b32008-08-25 01:50:24 +00001867class OtherTest(unittest.TestCase):
1868 # TODO: add more tests for deliver/answer challenge.
1869 def test_deliver_challenge_auth_failure(self):
1870 class _FakeConnection(object):
1871 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001872 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001873 def send_bytes(self, data):
1874 pass
1875 self.assertRaises(multiprocessing.AuthenticationError,
1876 multiprocessing.connection.deliver_challenge,
1877 _FakeConnection(), b'abc')
1878
1879 def test_answer_challenge_auth_failure(self):
1880 class _FakeConnection(object):
1881 def __init__(self):
1882 self.count = 0
1883 def recv_bytes(self, size):
1884 self.count += 1
1885 if self.count == 1:
1886 return multiprocessing.connection.CHALLENGE
1887 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001888 return b'something bogus'
1889 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001890 def send_bytes(self, data):
1891 pass
1892 self.assertRaises(multiprocessing.AuthenticationError,
1893 multiprocessing.connection.answer_challenge,
1894 _FakeConnection(), b'abc')
1895
Jesse Noller7152f6d2009-04-02 05:17:26 +00001896#
1897# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1898#
1899
1900def initializer(ns):
1901 ns.test += 1
1902
1903class TestInitializers(unittest.TestCase):
1904 def setUp(self):
1905 self.mgr = multiprocessing.Manager()
1906 self.ns = self.mgr.Namespace()
1907 self.ns.test = 0
1908
1909 def tearDown(self):
1910 self.mgr.shutdown()
1911
1912 def test_manager_initializer(self):
1913 m = multiprocessing.managers.SyncManager()
1914 self.assertRaises(TypeError, m.start, 1)
1915 m.start(initializer, (self.ns,))
1916 self.assertEqual(self.ns.test, 1)
1917 m.shutdown()
1918
1919 def test_pool_initializer(self):
1920 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1921 p = multiprocessing.Pool(1, initializer, (self.ns,))
1922 p.close()
1923 p.join()
1924 self.assertEqual(self.ns.test, 1)
1925
Jesse Noller1b90efb2009-06-30 17:11:52 +00001926#
1927# Issue 5155, 5313, 5331: Test process in processes
1928# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1929#
1930
1931def _ThisSubProcess(q):
1932 try:
1933 item = q.get(block=False)
1934 except Queue.Empty:
1935 pass
1936
1937def _TestProcess(q):
1938 queue = multiprocessing.Queue()
1939 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1940 subProc.start()
1941 subProc.join()
1942
1943def _afunc(x):
1944 return x*x
1945
1946def pool_in_process():
1947 pool = multiprocessing.Pool(processes=4)
1948 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1949
1950class _file_like(object):
1951 def __init__(self, delegate):
1952 self._delegate = delegate
1953 self._pid = None
1954
1955 @property
1956 def cache(self):
1957 pid = os.getpid()
1958 # There are no race conditions since fork keeps only the running thread
1959 if pid != self._pid:
1960 self._pid = pid
1961 self._cache = []
1962 return self._cache
1963
1964 def write(self, data):
1965 self.cache.append(data)
1966
1967 def flush(self):
1968 self._delegate.write(''.join(self.cache))
1969 self._cache = []
1970
1971class TestStdinBadfiledescriptor(unittest.TestCase):
1972
1973 def test_queue_in_process(self):
1974 queue = multiprocessing.Queue()
1975 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1976 proc.start()
1977 proc.join()
1978
1979 def test_pool_in_process(self):
1980 p = multiprocessing.Process(target=pool_in_process)
1981 p.start()
1982 p.join()
1983
1984 def test_flushing(self):
1985 sio = StringIO()
1986 flike = _file_like(sio)
1987 flike.write('foo')
1988 proc = multiprocessing.Process(target=lambda: flike.flush())
1989 flike.flush()
1990 assert sio.getvalue() == 'foo'
1991
1992testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1993 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001994
Benjamin Petersondfd79492008-06-13 19:13:39 +00001995#
1996#
1997#
1998
1999def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002000 if sys.platform.startswith("linux"):
2001 try:
2002 lock = multiprocessing.RLock()
2003 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002004 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002005
Benjamin Petersondfd79492008-06-13 19:13:39 +00002006 if run is None:
2007 from test.test_support import run_unittest as run
2008
2009 util.get_temp_dir() # creates temp directory for use by all processes
2010
2011 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2012
Jesse Noller146b7ab2008-07-02 16:44:09 +00002013 ProcessesMixin.pool = multiprocessing.Pool(4)
2014 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2015 ManagerMixin.manager.__init__()
2016 ManagerMixin.manager.start()
2017 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002018
2019 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002020 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2021 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002022 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2023 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002024 )
2025
2026 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2027 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Florent Xicluna07627882010-03-21 01:14:24 +00002028 with test_support.check_py3k_warnings(
2029 (".+__(get|set)slice__ has been removed", DeprecationWarning)):
2030 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002031
Jesse Noller146b7ab2008-07-02 16:44:09 +00002032 ThreadsMixin.pool.terminate()
2033 ProcessesMixin.pool.terminate()
2034 ManagerMixin.pool.terminate()
2035 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002036
Jesse Noller146b7ab2008-07-02 16:44:09 +00002037 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002038
2039def main():
2040 test_main(unittest.TextTestRunner(verbosity=2).run)
2041
2042if __name__ == '__main__':
2043 main()