blob: c1ab3960293d9c86ddb171a3249fdd426f4334da [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000020from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000021from StringIO import StringIO
Benjamin Petersondfd79492008-06-13 19:13:39 +000022
Jesse Noller37040cd2008-09-30 00:15:45 +000023
R. David Murray3db8a342009-03-30 23:05:48 +000024_multiprocessing = test_support.import_module('_multiprocessing')
25
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Petersone79edf52008-07-13 18:34:58 +000041latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000048#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000049
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersondfd79492008-06-13 19:13:39 +000065#
66# Creates a wrapper for a function which records the time it takes to finish
67#
68
69class TimingWrapper(object):
70
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
74
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
81
82#
83# Base class for test cases
84#
85
86class BaseTestCase(object):
87
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
93
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
101
102#
103# Return the value of a semaphore
104#
105
106def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
117
118#
119# Testcases
120#
121
122class _TestProcess(BaseTestCase):
123
124 ALLOWED_TYPES = ('processes', 'threads')
125
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
129
130 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000131 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000132
133 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000134 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000135 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000136 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000139
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000144 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000145 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000146 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000147 q.put(current.pid)
148
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
157 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000158 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 current = self.current_process()
160
161 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000162 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000163 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000164 self.assertEquals(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000165 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000167 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000168
169 p.start()
170
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000172 self.assertEquals(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000173 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000174
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000177 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000178 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000179 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000180 self.assertEquals(q.get(), p.pid)
181
182 p.join()
183
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000184 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185 self.assertEquals(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000186 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187
188 def _test_terminate(self):
189 time.sleep(1000)
190
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
194
195 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000196 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000197 p.start()
198
199 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000200 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202
203 p.terminate()
204
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
209 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000211
212 p.join()
213
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
224
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
227
228 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000229 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000230
231 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000232 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000233
234 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000235 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
244 )
245 p.start()
246 p.join()
247
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
251
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
256
257 expected = [
258 [],
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
265 ]
266 self.assertEqual(result, expected)
267
268#
269#
270#
271
272class _UpperCaser(multiprocessing.Process):
273
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
277
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
283
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
288
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
293
294class _TestSubclassingProcess(BaseTestCase):
295
296 ALLOWED_TYPES = ('processes',)
297
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
305
306#
307#
308#
309
310def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
315
316def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
321
322
323class _TestQueue(BaseTestCase):
324
325
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
331
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
337
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
341 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000342 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000343 proc.start()
344
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
347
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
354
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
357
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
360
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
363
364 self.assertRaises(Queue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
366
367 self.assertRaises(Queue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(Queue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
373 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
376 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
378
379 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
382 child_can_start.set()
383 parent_can_continue.wait()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 proc.join()
389
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000392 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
398
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
403
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
407 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000408 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 proc.start()
410
411 self.assertEqual(queue_empty(queue), True)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
418
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(Queue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(Queue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(Queue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(Queue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in xrange(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in xrange(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
Jesse Noller82eb5902009-03-30 23:29:31 +0000550 def test_lock_context(self):
551 with self.Lock():
552 pass
553
Benjamin Petersondfd79492008-06-13 19:13:39 +0000554
555class _TestSemaphore(BaseTestCase):
556
557 def _test_semaphore(self, sem):
558 self.assertReturnsIfImplemented(2, get_value, sem)
559 self.assertEqual(sem.acquire(), True)
560 self.assertReturnsIfImplemented(1, get_value, sem)
561 self.assertEqual(sem.acquire(), True)
562 self.assertReturnsIfImplemented(0, get_value, sem)
563 self.assertEqual(sem.acquire(False), False)
564 self.assertReturnsIfImplemented(0, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(1, get_value, sem)
567 self.assertEqual(sem.release(), None)
568 self.assertReturnsIfImplemented(2, get_value, sem)
569
570 def test_semaphore(self):
571 sem = self.Semaphore(2)
572 self._test_semaphore(sem)
573 self.assertEqual(sem.release(), None)
574 self.assertReturnsIfImplemented(3, get_value, sem)
575 self.assertEqual(sem.release(), None)
576 self.assertReturnsIfImplemented(4, get_value, sem)
577
578 def test_bounded_semaphore(self):
579 sem = self.BoundedSemaphore(2)
580 self._test_semaphore(sem)
581 # Currently fails on OS/X
582 #if HAVE_GETVALUE:
583 # self.assertRaises(ValueError, sem.release)
584 # self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_timeout(self):
587 if self.TYPE != 'processes':
588 return
589
590 sem = self.Semaphore(0)
591 acquire = TimingWrapper(sem.acquire)
592
593 self.assertEqual(acquire(False), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
595
596 self.assertEqual(acquire(False, None), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
598
599 self.assertEqual(acquire(False, TIMEOUT1), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, 0)
601
602 self.assertEqual(acquire(True, TIMEOUT2), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
604
605 self.assertEqual(acquire(timeout=TIMEOUT3), False)
606 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
607
608
609class _TestCondition(BaseTestCase):
610
611 def f(self, cond, sleeping, woken, timeout=None):
612 cond.acquire()
613 sleeping.release()
614 cond.wait(timeout)
615 woken.release()
616 cond.release()
617
618 def check_invariant(self, cond):
619 # this is only supposed to succeed when there are no sleepers
620 if self.TYPE == 'processes':
621 try:
622 sleepers = (cond._sleeping_count.get_value() -
623 cond._woken_count.get_value())
624 self.assertEqual(sleepers, 0)
625 self.assertEqual(cond._wait_semaphore.get_value(), 0)
626 except NotImplementedError:
627 pass
628
629 def test_notify(self):
630 cond = self.Condition()
631 sleeping = self.Semaphore(0)
632 woken = self.Semaphore(0)
633
634 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000635 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000636 p.start()
637
638 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000639 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000640 p.start()
641
642 # wait for both children to start sleeping
643 sleeping.acquire()
644 sleeping.acquire()
645
646 # check no process/thread has woken up
647 time.sleep(DELTA)
648 self.assertReturnsIfImplemented(0, get_value, woken)
649
650 # wake up one process/thread
651 cond.acquire()
652 cond.notify()
653 cond.release()
654
655 # check one process/thread has woken up
656 time.sleep(DELTA)
657 self.assertReturnsIfImplemented(1, get_value, woken)
658
659 # wake up another
660 cond.acquire()
661 cond.notify()
662 cond.release()
663
664 # check other has woken up
665 time.sleep(DELTA)
666 self.assertReturnsIfImplemented(2, get_value, woken)
667
668 # check state is not mucked up
669 self.check_invariant(cond)
670 p.join()
671
672 def test_notify_all(self):
673 cond = self.Condition()
674 sleeping = self.Semaphore(0)
675 woken = self.Semaphore(0)
676
677 # start some threads/processes which will timeout
678 for i in range(3):
679 p = self.Process(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000681 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 p.start()
683
684 t = threading.Thread(target=self.f,
685 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000686 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000687 t.start()
688
689 # wait for them all to sleep
690 for i in xrange(6):
691 sleeping.acquire()
692
693 # check they have all timed out
694 for i in xrange(6):
695 woken.acquire()
696 self.assertReturnsIfImplemented(0, get_value, woken)
697
698 # check state is not mucked up
699 self.check_invariant(cond)
700
701 # start some more threads/processes
702 for i in range(3):
703 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000704 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000705 p.start()
706
707 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000708 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000709 t.start()
710
711 # wait for them to all sleep
712 for i in xrange(6):
713 sleeping.acquire()
714
715 # check no process/thread has woken up
716 time.sleep(DELTA)
717 self.assertReturnsIfImplemented(0, get_value, woken)
718
719 # wake them all up
720 cond.acquire()
721 cond.notify_all()
722 cond.release()
723
724 # check they have all woken
725 time.sleep(DELTA)
726 self.assertReturnsIfImplemented(6, get_value, woken)
727
728 # check state is not mucked up
729 self.check_invariant(cond)
730
731 def test_timeout(self):
732 cond = self.Condition()
733 wait = TimingWrapper(cond.wait)
734 cond.acquire()
735 res = wait(TIMEOUT1)
736 cond.release()
737 self.assertEqual(res, None)
738 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
739
740
741class _TestEvent(BaseTestCase):
742
743 def _test_event(self, event):
744 time.sleep(TIMEOUT2)
745 event.set()
746
747 def test_event(self):
748 event = self.Event()
749 wait = TimingWrapper(event.wait)
750
751 # Removed temporaily, due to API shear, this does not
752 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000753 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000754
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000755 # Removed, threading.Event.wait() will return the value of the __flag
756 # instead of None. API Shear with the semaphore backed mp.Event
757 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000758 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000759 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000760 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
761
762 event.set()
763
764 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000765 self.assertEqual(event.is_set(), True)
766 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000767 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000768 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000769 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
770 # self.assertEqual(event.is_set(), True)
771
772 event.clear()
773
774 #self.assertEqual(event.is_set(), False)
775
776 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000777 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000778
779#
780#
781#
782
783class _TestValue(BaseTestCase):
784
785 codes_values = [
786 ('i', 4343, 24234),
787 ('d', 3.625, -4.25),
788 ('h', -232, 234),
789 ('c', latin('x'), latin('y'))
790 ]
791
792 def _test(self, values):
793 for sv, cv in zip(values, self.codes_values):
794 sv.value = cv[2]
795
796
797 def test_value(self, raw=False):
798 if self.TYPE != 'processes':
799 return
800
801 if raw:
802 values = [self.RawValue(code, value)
803 for code, value, _ in self.codes_values]
804 else:
805 values = [self.Value(code, value)
806 for code, value, _ in self.codes_values]
807
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[1])
810
811 proc = self.Process(target=self._test, args=(values,))
812 proc.start()
813 proc.join()
814
815 for sv, cv in zip(values, self.codes_values):
816 self.assertEqual(sv.value, cv[2])
817
818 def test_rawvalue(self):
819 self.test_value(raw=True)
820
821 def test_getobj_getlock(self):
822 if self.TYPE != 'processes':
823 return
824
825 val1 = self.Value('i', 5)
826 lock1 = val1.get_lock()
827 obj1 = val1.get_obj()
828
829 val2 = self.Value('i', 5, lock=None)
830 lock2 = val2.get_lock()
831 obj2 = val2.get_obj()
832
833 lock = self.Lock()
834 val3 = self.Value('i', 5, lock=lock)
835 lock3 = val3.get_lock()
836 obj3 = val3.get_obj()
837 self.assertEqual(lock, lock3)
838
Jesse Noller6ab22152009-01-18 02:45:38 +0000839 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000840 self.assertFalse(hasattr(arr4, 'get_lock'))
841 self.assertFalse(hasattr(arr4, 'get_obj'))
842
Jesse Noller6ab22152009-01-18 02:45:38 +0000843 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
844
845 arr5 = self.RawValue('i', 5)
846 self.assertFalse(hasattr(arr5, 'get_lock'))
847 self.assertFalse(hasattr(arr5, 'get_obj'))
848
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849
850class _TestArray(BaseTestCase):
851
852 def f(self, seq):
853 for i in range(1, len(seq)):
854 seq[i] += seq[i-1]
855
856 def test_array(self, raw=False):
857 if self.TYPE != 'processes':
858 return
859
860 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
861 if raw:
862 arr = self.RawArray('i', seq)
863 else:
864 arr = self.Array('i', seq)
865
866 self.assertEqual(len(arr), len(seq))
867 self.assertEqual(arr[3], seq[3])
868 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
869
870 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
871
872 self.assertEqual(list(arr[:]), seq)
873
874 self.f(seq)
875
876 p = self.Process(target=self.f, args=(arr,))
877 p.start()
878 p.join()
879
880 self.assertEqual(list(arr[:]), seq)
881
882 def test_rawarray(self):
883 self.test_array(raw=True)
884
885 def test_getobj_getlock_obj(self):
886 if self.TYPE != 'processes':
887 return
888
889 arr1 = self.Array('i', range(10))
890 lock1 = arr1.get_lock()
891 obj1 = arr1.get_obj()
892
893 arr2 = self.Array('i', range(10), lock=None)
894 lock2 = arr2.get_lock()
895 obj2 = arr2.get_obj()
896
897 lock = self.Lock()
898 arr3 = self.Array('i', range(10), lock=lock)
899 lock3 = arr3.get_lock()
900 obj3 = arr3.get_obj()
901 self.assertEqual(lock, lock3)
902
Jesse Noller6ab22152009-01-18 02:45:38 +0000903 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000904 self.assertFalse(hasattr(arr4, 'get_lock'))
905 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000906 self.assertRaises(AttributeError,
907 self.Array, 'i', range(10), lock='notalock')
908
909 arr5 = self.RawArray('i', range(10))
910 self.assertFalse(hasattr(arr5, 'get_lock'))
911 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912
913#
914#
915#
916
917class _TestContainers(BaseTestCase):
918
919 ALLOWED_TYPES = ('manager',)
920
921 def test_list(self):
922 a = self.list(range(10))
923 self.assertEqual(a[:], range(10))
924
925 b = self.list()
926 self.assertEqual(b[:], [])
927
928 b.extend(range(5))
929 self.assertEqual(b[:], range(5))
930
931 self.assertEqual(b[2], 2)
932 self.assertEqual(b[2:10], [2,3,4])
933
934 b *= 2
935 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
936
937 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
938
939 self.assertEqual(a[:], range(10))
940
941 d = [a, b]
942 e = self.list(d)
943 self.assertEqual(
944 e[:],
945 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
946 )
947
948 f = self.list([a])
949 a.append('hello')
950 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
951
952 def test_dict(self):
953 d = self.dict()
954 indices = range(65, 70)
955 for i in indices:
956 d[i] = chr(i)
957 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
958 self.assertEqual(sorted(d.keys()), indices)
959 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
960 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
961
962 def test_namespace(self):
963 n = self.Namespace()
964 n.name = 'Bob'
965 n.job = 'Builder'
966 n._hidden = 'hidden'
967 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
968 del n.job
969 self.assertEqual(str(n), "Namespace(name='Bob')")
970 self.assertTrue(hasattr(n, 'name'))
971 self.assertTrue(not hasattr(n, 'job'))
972
973#
974#
975#
976
977def sqr(x, wait=0.0):
978 time.sleep(wait)
979 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000980class _TestPool(BaseTestCase):
981
982 def test_apply(self):
983 papply = self.pool.apply
984 self.assertEqual(papply(sqr, (5,)), sqr(5))
985 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
986
987 def test_map(self):
988 pmap = self.pool.map
989 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
990 self.assertEqual(pmap(sqr, range(100), chunksize=20),
991 map(sqr, range(100)))
992
Jesse Noller7530e472009-07-16 14:23:04 +0000993 def test_map_chunksize(self):
994 try:
995 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
996 except multiprocessing.TimeoutError:
997 self.fail("pool.map_async with chunksize stalled on null list")
998
Benjamin Petersondfd79492008-06-13 19:13:39 +0000999 def test_async(self):
1000 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1001 get = TimingWrapper(res.get)
1002 self.assertEqual(get(), 49)
1003 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1004
1005 def test_async_timeout(self):
1006 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1007 get = TimingWrapper(res.get)
1008 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1009 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1010
1011 def test_imap(self):
1012 it = self.pool.imap(sqr, range(10))
1013 self.assertEqual(list(it), map(sqr, range(10)))
1014
1015 it = self.pool.imap(sqr, range(10))
1016 for i in range(10):
1017 self.assertEqual(it.next(), i*i)
1018 self.assertRaises(StopIteration, it.next)
1019
1020 it = self.pool.imap(sqr, range(1000), chunksize=100)
1021 for i in range(1000):
1022 self.assertEqual(it.next(), i*i)
1023 self.assertRaises(StopIteration, it.next)
1024
1025 def test_imap_unordered(self):
1026 it = self.pool.imap_unordered(sqr, range(1000))
1027 self.assertEqual(sorted(it), map(sqr, range(1000)))
1028
1029 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1030 self.assertEqual(sorted(it), map(sqr, range(1000)))
1031
1032 def test_make_pool(self):
1033 p = multiprocessing.Pool(3)
1034 self.assertEqual(3, len(p._pool))
1035 p.close()
1036 p.join()
1037
1038 def test_terminate(self):
1039 if self.TYPE == 'manager':
1040 # On Unix a forked process increfs each shared object to
1041 # which its parent process held a reference. If the
1042 # forked process gets terminated then there is likely to
1043 # be a reference leak. So to prevent
1044 # _TestZZZNumberOfObjects from failing we skip this test
1045 # when using a manager.
1046 return
1047
1048 result = self.pool.map_async(
1049 time.sleep, [0.1 for i in range(10000)], chunksize=1
1050 )
1051 self.pool.terminate()
1052 join = TimingWrapper(self.pool.join)
1053 join()
1054 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001055
1056class _TestPoolWorkerLifetime(BaseTestCase):
1057
1058 ALLOWED_TYPES = ('processes', )
1059 def test_pool_worker_lifetime(self):
1060 p = multiprocessing.Pool(3, maxtasksperchild=10)
1061 self.assertEqual(3, len(p._pool))
1062 origworkerpids = [w.pid for w in p._pool]
1063 # Run many tasks so each worker gets replaced (hopefully)
1064 results = []
1065 for i in range(100):
1066 results.append(p.apply_async(sqr, (i, )))
1067 # Fetch the results and verify we got the right answers,
1068 # also ensuring all the tasks have completed.
1069 for (j, res) in enumerate(results):
1070 self.assertEqual(res.get(), sqr(j))
1071 # Refill the pool
1072 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001073 # Wait until all workers are alive
1074 countdown = 5
1075 while countdown and not all(w.is_alive() for w in p._pool):
1076 countdown -= 1
1077 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001078 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001079 # All pids should be assigned. See issue #7805.
1080 self.assertNotIn(None, origworkerpids)
1081 self.assertNotIn(None, finalworkerpids)
1082 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001083 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1084 p.close()
1085 p.join()
1086
Benjamin Petersondfd79492008-06-13 19:13:39 +00001087#
1088# Test that manager has expected number of shared objects left
1089#
1090
1091class _TestZZZNumberOfObjects(BaseTestCase):
1092 # Because test cases are sorted alphabetically, this one will get
1093 # run after all the other tests for the manager. It tests that
1094 # there have been no "reference leaks" for the manager's shared
1095 # objects. Note the comment in _TestPool.test_terminate().
1096 ALLOWED_TYPES = ('manager',)
1097
1098 def test_number_of_objects(self):
1099 EXPECTED_NUMBER = 1 # the pool object is still alive
1100 multiprocessing.active_children() # discard dead process objs
1101 gc.collect() # do garbage collection
1102 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001103 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001104 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001105 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001106 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001107
1108 self.assertEqual(refs, EXPECTED_NUMBER)
1109
1110#
1111# Test of creating a customized manager class
1112#
1113
1114from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1115
1116class FooBar(object):
1117 def f(self):
1118 return 'f()'
1119 def g(self):
1120 raise ValueError
1121 def _h(self):
1122 return '_h()'
1123
1124def baz():
1125 for i in xrange(10):
1126 yield i*i
1127
1128class IteratorProxy(BaseProxy):
1129 _exposed_ = ('next', '__next__')
1130 def __iter__(self):
1131 return self
1132 def next(self):
1133 return self._callmethod('next')
1134 def __next__(self):
1135 return self._callmethod('__next__')
1136
1137class MyManager(BaseManager):
1138 pass
1139
1140MyManager.register('Foo', callable=FooBar)
1141MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1142MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1143
1144
1145class _TestMyManager(BaseTestCase):
1146
1147 ALLOWED_TYPES = ('manager',)
1148
1149 def test_mymanager(self):
1150 manager = MyManager()
1151 manager.start()
1152
1153 foo = manager.Foo()
1154 bar = manager.Bar()
1155 baz = manager.baz()
1156
1157 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1158 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1159
1160 self.assertEqual(foo_methods, ['f', 'g'])
1161 self.assertEqual(bar_methods, ['f', '_h'])
1162
1163 self.assertEqual(foo.f(), 'f()')
1164 self.assertRaises(ValueError, foo.g)
1165 self.assertEqual(foo._callmethod('f'), 'f()')
1166 self.assertRaises(RemoteError, foo._callmethod, '_h')
1167
1168 self.assertEqual(bar.f(), 'f()')
1169 self.assertEqual(bar._h(), '_h()')
1170 self.assertEqual(bar._callmethod('f'), 'f()')
1171 self.assertEqual(bar._callmethod('_h'), '_h()')
1172
1173 self.assertEqual(list(baz), [i*i for i in range(10)])
1174
1175 manager.shutdown()
1176
1177#
1178# Test of connecting to a remote server and using xmlrpclib for serialization
1179#
1180
1181_queue = Queue.Queue()
1182def get_queue():
1183 return _queue
1184
1185class QueueManager(BaseManager):
1186 '''manager class used by server process'''
1187QueueManager.register('get_queue', callable=get_queue)
1188
1189class QueueManager2(BaseManager):
1190 '''manager class which specifies the same interface as QueueManager'''
1191QueueManager2.register('get_queue')
1192
1193
1194SERIALIZER = 'xmlrpclib'
1195
1196class _TestRemoteManager(BaseTestCase):
1197
1198 ALLOWED_TYPES = ('manager',)
1199
1200 def _putter(self, address, authkey):
1201 manager = QueueManager2(
1202 address=address, authkey=authkey, serializer=SERIALIZER
1203 )
1204 manager.connect()
1205 queue = manager.get_queue()
1206 queue.put(('hello world', None, True, 2.25))
1207
1208 def test_remote(self):
1209 authkey = os.urandom(32)
1210
1211 manager = QueueManager(
1212 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1213 )
1214 manager.start()
1215
1216 p = self.Process(target=self._putter, args=(manager.address, authkey))
1217 p.start()
1218
1219 manager2 = QueueManager2(
1220 address=manager.address, authkey=authkey, serializer=SERIALIZER
1221 )
1222 manager2.connect()
1223 queue = manager2.get_queue()
1224
1225 # Note that xmlrpclib will deserialize object as a list not a tuple
1226 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1227
1228 # Because we are using xmlrpclib for serialization instead of
1229 # pickle this will cause a serialization error.
1230 self.assertRaises(Exception, queue.put, time.sleep)
1231
1232 # Make queue finalizer run before the server is stopped
1233 del queue
1234 manager.shutdown()
1235
Jesse Noller459a6482009-03-30 15:50:42 +00001236class _TestManagerRestart(BaseTestCase):
1237
1238 def _putter(self, address, authkey):
1239 manager = QueueManager(
1240 address=address, authkey=authkey, serializer=SERIALIZER)
1241 manager.connect()
1242 queue = manager.get_queue()
1243 queue.put('hello world')
1244
1245 def test_rapid_restart(self):
1246 authkey = os.urandom(32)
R. David Murrayc7298ff2009-12-14 21:57:39 +00001247 port = test_support.find_unused_port()
Jesse Noller459a6482009-03-30 15:50:42 +00001248 manager = QueueManager(
R. David Murrayc7298ff2009-12-14 21:57:39 +00001249 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001250 manager.start()
1251
1252 p = self.Process(target=self._putter, args=(manager.address, authkey))
1253 p.start()
1254 queue = manager.get_queue()
1255 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001256 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001257 manager.shutdown()
1258 manager = QueueManager(
R. David Murrayc7298ff2009-12-14 21:57:39 +00001259 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001260 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001261 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001262
Benjamin Petersondfd79492008-06-13 19:13:39 +00001263#
1264#
1265#
1266
1267SENTINEL = latin('')
1268
1269class _TestConnection(BaseTestCase):
1270
1271 ALLOWED_TYPES = ('processes', 'threads')
1272
1273 def _echo(self, conn):
1274 for msg in iter(conn.recv_bytes, SENTINEL):
1275 conn.send_bytes(msg)
1276 conn.close()
1277
1278 def test_connection(self):
1279 conn, child_conn = self.Pipe()
1280
1281 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001282 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001283 p.start()
1284
1285 seq = [1, 2.25, None]
1286 msg = latin('hello world')
1287 longmsg = msg * 10
1288 arr = array.array('i', range(4))
1289
1290 if self.TYPE == 'processes':
1291 self.assertEqual(type(conn.fileno()), int)
1292
1293 self.assertEqual(conn.send(seq), None)
1294 self.assertEqual(conn.recv(), seq)
1295
1296 self.assertEqual(conn.send_bytes(msg), None)
1297 self.assertEqual(conn.recv_bytes(), msg)
1298
1299 if self.TYPE == 'processes':
1300 buffer = array.array('i', [0]*10)
1301 expected = list(arr) + [0] * (10 - len(arr))
1302 self.assertEqual(conn.send_bytes(arr), None)
1303 self.assertEqual(conn.recv_bytes_into(buffer),
1304 len(arr) * buffer.itemsize)
1305 self.assertEqual(list(buffer), expected)
1306
1307 buffer = array.array('i', [0]*10)
1308 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1309 self.assertEqual(conn.send_bytes(arr), None)
1310 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1311 len(arr) * buffer.itemsize)
1312 self.assertEqual(list(buffer), expected)
1313
1314 buffer = bytearray(latin(' ' * 40))
1315 self.assertEqual(conn.send_bytes(longmsg), None)
1316 try:
1317 res = conn.recv_bytes_into(buffer)
1318 except multiprocessing.BufferTooShort, e:
1319 self.assertEqual(e.args, (longmsg,))
1320 else:
1321 self.fail('expected BufferTooShort, got %s' % res)
1322
1323 poll = TimingWrapper(conn.poll)
1324
1325 self.assertEqual(poll(), False)
1326 self.assertTimingAlmostEqual(poll.elapsed, 0)
1327
1328 self.assertEqual(poll(TIMEOUT1), False)
1329 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1330
1331 conn.send(None)
1332
1333 self.assertEqual(poll(TIMEOUT1), True)
1334 self.assertTimingAlmostEqual(poll.elapsed, 0)
1335
1336 self.assertEqual(conn.recv(), None)
1337
1338 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1339 conn.send_bytes(really_big_msg)
1340 self.assertEqual(conn.recv_bytes(), really_big_msg)
1341
1342 conn.send_bytes(SENTINEL) # tell child to quit
1343 child_conn.close()
1344
1345 if self.TYPE == 'processes':
1346 self.assertEqual(conn.readable, True)
1347 self.assertEqual(conn.writable, True)
1348 self.assertRaises(EOFError, conn.recv)
1349 self.assertRaises(EOFError, conn.recv_bytes)
1350
1351 p.join()
1352
1353 def test_duplex_false(self):
1354 reader, writer = self.Pipe(duplex=False)
1355 self.assertEqual(writer.send(1), None)
1356 self.assertEqual(reader.recv(), 1)
1357 if self.TYPE == 'processes':
1358 self.assertEqual(reader.readable, True)
1359 self.assertEqual(reader.writable, False)
1360 self.assertEqual(writer.readable, False)
1361 self.assertEqual(writer.writable, True)
1362 self.assertRaises(IOError, reader.send, 2)
1363 self.assertRaises(IOError, writer.recv)
1364 self.assertRaises(IOError, writer.poll)
1365
1366 def test_spawn_close(self):
1367 # We test that a pipe connection can be closed by parent
1368 # process immediately after child is spawned. On Windows this
1369 # would have sometimes failed on old versions because
1370 # child_conn would be closed before the child got a chance to
1371 # duplicate it.
1372 conn, child_conn = self.Pipe()
1373
1374 p = self.Process(target=self._echo, args=(child_conn,))
1375 p.start()
1376 child_conn.close() # this might complete before child initializes
1377
1378 msg = latin('hello')
1379 conn.send_bytes(msg)
1380 self.assertEqual(conn.recv_bytes(), msg)
1381
1382 conn.send_bytes(SENTINEL)
1383 conn.close()
1384 p.join()
1385
1386 def test_sendbytes(self):
1387 if self.TYPE != 'processes':
1388 return
1389
1390 msg = latin('abcdefghijklmnopqrstuvwxyz')
1391 a, b = self.Pipe()
1392
1393 a.send_bytes(msg)
1394 self.assertEqual(b.recv_bytes(), msg)
1395
1396 a.send_bytes(msg, 5)
1397 self.assertEqual(b.recv_bytes(), msg[5:])
1398
1399 a.send_bytes(msg, 7, 8)
1400 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1401
1402 a.send_bytes(msg, 26)
1403 self.assertEqual(b.recv_bytes(), latin(''))
1404
1405 a.send_bytes(msg, 26, 0)
1406 self.assertEqual(b.recv_bytes(), latin(''))
1407
1408 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1409
1410 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1411
1412 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1413
1414 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1415
1416 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1417
Benjamin Petersondfd79492008-06-13 19:13:39 +00001418class _TestListenerClient(BaseTestCase):
1419
1420 ALLOWED_TYPES = ('processes', 'threads')
1421
1422 def _test(self, address):
1423 conn = self.connection.Client(address)
1424 conn.send('hello')
1425 conn.close()
1426
1427 def test_listener_client(self):
1428 for family in self.connection.families:
1429 l = self.connection.Listener(family=family)
1430 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001431 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001432 p.start()
1433 conn = l.accept()
1434 self.assertEqual(conn.recv(), 'hello')
1435 p.join()
1436 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001437#
1438# Test of sending connection and socket objects between processes
1439#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001440"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001441class _TestPicklingConnections(BaseTestCase):
1442
1443 ALLOWED_TYPES = ('processes',)
1444
1445 def _listener(self, conn, families):
1446 for fam in families:
1447 l = self.connection.Listener(family=fam)
1448 conn.send(l.address)
1449 new_conn = l.accept()
1450 conn.send(new_conn)
1451
1452 if self.TYPE == 'processes':
1453 l = socket.socket()
1454 l.bind(('localhost', 0))
1455 conn.send(l.getsockname())
1456 l.listen(1)
1457 new_conn, addr = l.accept()
1458 conn.send(new_conn)
1459
1460 conn.recv()
1461
1462 def _remote(self, conn):
1463 for (address, msg) in iter(conn.recv, None):
1464 client = self.connection.Client(address)
1465 client.send(msg.upper())
1466 client.close()
1467
1468 if self.TYPE == 'processes':
1469 address, msg = conn.recv()
1470 client = socket.socket()
1471 client.connect(address)
1472 client.sendall(msg.upper())
1473 client.close()
1474
1475 conn.close()
1476
1477 def test_pickling(self):
1478 try:
1479 multiprocessing.allow_connection_pickling()
1480 except ImportError:
1481 return
1482
1483 families = self.connection.families
1484
1485 lconn, lconn0 = self.Pipe()
1486 lp = self.Process(target=self._listener, args=(lconn0, families))
1487 lp.start()
1488 lconn0.close()
1489
1490 rconn, rconn0 = self.Pipe()
1491 rp = self.Process(target=self._remote, args=(rconn0,))
1492 rp.start()
1493 rconn0.close()
1494
1495 for fam in families:
1496 msg = ('This connection uses family %s' % fam).encode('ascii')
1497 address = lconn.recv()
1498 rconn.send((address, msg))
1499 new_conn = lconn.recv()
1500 self.assertEqual(new_conn.recv(), msg.upper())
1501
1502 rconn.send(None)
1503
1504 if self.TYPE == 'processes':
1505 msg = latin('This connection uses a normal socket')
1506 address = lconn.recv()
1507 rconn.send((address, msg))
1508 if hasattr(socket, 'fromfd'):
1509 new_conn = lconn.recv()
1510 self.assertEqual(new_conn.recv(100), msg.upper())
1511 else:
1512 # XXX On Windows with Py2.6 need to backport fromfd()
1513 discard = lconn.recv_bytes()
1514
1515 lconn.send(None)
1516
1517 rconn.close()
1518 lconn.close()
1519
1520 lp.join()
1521 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001522"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001523#
1524#
1525#
1526
1527class _TestHeap(BaseTestCase):
1528
1529 ALLOWED_TYPES = ('processes',)
1530
1531 def test_heap(self):
1532 iterations = 5000
1533 maxblocks = 50
1534 blocks = []
1535
1536 # create and destroy lots of blocks of different sizes
1537 for i in xrange(iterations):
1538 size = int(random.lognormvariate(0, 1) * 1000)
1539 b = multiprocessing.heap.BufferWrapper(size)
1540 blocks.append(b)
1541 if len(blocks) > maxblocks:
1542 i = random.randrange(maxblocks)
1543 del blocks[i]
1544
1545 # get the heap object
1546 heap = multiprocessing.heap.BufferWrapper._heap
1547
1548 # verify the state of the heap
1549 all = []
1550 occupied = 0
1551 for L in heap._len_to_seq.values():
1552 for arena, start, stop in L:
1553 all.append((heap._arenas.index(arena), start, stop,
1554 stop-start, 'free'))
1555 for arena, start, stop in heap._allocated_blocks:
1556 all.append((heap._arenas.index(arena), start, stop,
1557 stop-start, 'occupied'))
1558 occupied += (stop-start)
1559
1560 all.sort()
1561
1562 for i in range(len(all)-1):
1563 (arena, start, stop) = all[i][:3]
1564 (narena, nstart, nstop) = all[i+1][:3]
1565 self.assertTrue((arena != narena and nstart == 0) or
1566 (stop == nstart))
1567
1568#
1569#
1570#
1571
1572try:
1573 from ctypes import Structure, Value, copy, c_int, c_double
1574except ImportError:
1575 Structure = object
1576 c_int = c_double = None
1577
1578class _Foo(Structure):
1579 _fields_ = [
1580 ('x', c_int),
1581 ('y', c_double)
1582 ]
1583
1584class _TestSharedCTypes(BaseTestCase):
1585
1586 ALLOWED_TYPES = ('processes',)
1587
1588 def _double(self, x, y, foo, arr, string):
1589 x.value *= 2
1590 y.value *= 2
1591 foo.x *= 2
1592 foo.y *= 2
1593 string.value *= 2
1594 for i in range(len(arr)):
1595 arr[i] *= 2
1596
1597 def test_sharedctypes(self, lock=False):
1598 if c_int is None:
1599 return
1600
1601 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001602 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001603 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001604 arr = self.Array('d', range(10), lock=lock)
1605 string = self.Array('c', 20, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001606 string.value = 'hello'
1607
1608 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1609 p.start()
1610 p.join()
1611
1612 self.assertEqual(x.value, 14)
1613 self.assertAlmostEqual(y.value, 2.0/3.0)
1614 self.assertEqual(foo.x, 6)
1615 self.assertAlmostEqual(foo.y, 4.0)
1616 for i in range(10):
1617 self.assertAlmostEqual(arr[i], i*2)
1618 self.assertEqual(string.value, latin('hellohello'))
1619
1620 def test_synchronize(self):
1621 self.test_sharedctypes(lock=True)
1622
1623 def test_copy(self):
1624 if c_int is None:
1625 return
1626
1627 foo = _Foo(2, 5.0)
1628 bar = copy(foo)
1629 foo.x = 0
1630 foo.y = 0
1631 self.assertEqual(bar.x, 2)
1632 self.assertAlmostEqual(bar.y, 5.0)
1633
1634#
1635#
1636#
1637
1638class _TestFinalize(BaseTestCase):
1639
1640 ALLOWED_TYPES = ('processes',)
1641
1642 def _test_finalize(self, conn):
1643 class Foo(object):
1644 pass
1645
1646 a = Foo()
1647 util.Finalize(a, conn.send, args=('a',))
1648 del a # triggers callback for a
1649
1650 b = Foo()
1651 close_b = util.Finalize(b, conn.send, args=('b',))
1652 close_b() # triggers callback for b
1653 close_b() # does nothing because callback has already been called
1654 del b # does nothing because callback has already been called
1655
1656 c = Foo()
1657 util.Finalize(c, conn.send, args=('c',))
1658
1659 d10 = Foo()
1660 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1661
1662 d01 = Foo()
1663 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1664 d02 = Foo()
1665 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1666 d03 = Foo()
1667 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1668
1669 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1670
1671 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1672
1673 # call mutliprocessing's cleanup function then exit process without
1674 # garbage collecting locals
1675 util._exit_function()
1676 conn.close()
1677 os._exit(0)
1678
1679 def test_finalize(self):
1680 conn, child_conn = self.Pipe()
1681
1682 p = self.Process(target=self._test_finalize, args=(child_conn,))
1683 p.start()
1684 p.join()
1685
1686 result = [obj for obj in iter(conn.recv, 'STOP')]
1687 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1688
1689#
1690# Test that from ... import * works for each module
1691#
1692
1693class _TestImportStar(BaseTestCase):
1694
1695 ALLOWED_TYPES = ('processes',)
1696
1697 def test_import(self):
1698 modules = (
1699 'multiprocessing', 'multiprocessing.connection',
1700 'multiprocessing.heap', 'multiprocessing.managers',
1701 'multiprocessing.pool', 'multiprocessing.process',
1702 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1703 'multiprocessing.synchronize', 'multiprocessing.util'
1704 )
1705
1706 for name in modules:
1707 __import__(name)
1708 mod = sys.modules[name]
1709
1710 for attr in getattr(mod, '__all__', ()):
1711 self.assertTrue(
1712 hasattr(mod, attr),
1713 '%r does not have attribute %r' % (mod, attr)
1714 )
1715
1716#
1717# Quick test that logging works -- does not test logging output
1718#
1719
1720class _TestLogging(BaseTestCase):
1721
1722 ALLOWED_TYPES = ('processes',)
1723
1724 def test_enable_logging(self):
1725 logger = multiprocessing.get_logger()
1726 logger.setLevel(util.SUBWARNING)
1727 self.assertTrue(logger is not None)
1728 logger.debug('this will not be printed')
1729 logger.info('nor will this')
1730 logger.setLevel(LOG_LEVEL)
1731
1732 def _test_level(self, conn):
1733 logger = multiprocessing.get_logger()
1734 conn.send(logger.getEffectiveLevel())
1735
1736 def test_level(self):
1737 LEVEL1 = 32
1738 LEVEL2 = 37
1739
1740 logger = multiprocessing.get_logger()
1741 root_logger = logging.getLogger()
1742 root_level = root_logger.level
1743
1744 reader, writer = multiprocessing.Pipe(duplex=False)
1745
1746 logger.setLevel(LEVEL1)
1747 self.Process(target=self._test_level, args=(writer,)).start()
1748 self.assertEqual(LEVEL1, reader.recv())
1749
1750 logger.setLevel(logging.NOTSET)
1751 root_logger.setLevel(LEVEL2)
1752 self.Process(target=self._test_level, args=(writer,)).start()
1753 self.assertEqual(LEVEL2, reader.recv())
1754
1755 root_logger.setLevel(root_level)
1756 logger.setLevel(level=LOG_LEVEL)
1757
Jesse Noller814d02d2009-11-21 14:38:23 +00001758
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001759# class _TestLoggingProcessName(BaseTestCase):
1760#
1761# def handle(self, record):
1762# assert record.processName == multiprocessing.current_process().name
1763# self.__handled = True
1764#
1765# def test_logging(self):
1766# handler = logging.Handler()
1767# handler.handle = self.handle
1768# self.__handled = False
1769# # Bypass getLogger() and side-effects
1770# logger = logging.getLoggerClass()(
1771# 'multiprocessing.test.TestLoggingProcessName')
1772# logger.addHandler(handler)
1773# logger.propagate = False
1774#
1775# logger.warn('foo')
1776# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001777
Benjamin Petersondfd79492008-06-13 19:13:39 +00001778#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001779# Test to verify handle verification, see issue 3321
1780#
1781
1782class TestInvalidHandle(unittest.TestCase):
1783
1784 def test_invalid_handles(self):
1785 if WIN32:
1786 return
1787 conn = _multiprocessing.Connection(44977608)
1788 self.assertRaises(IOError, conn.poll)
1789 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1790#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001791# Functions used to create test cases from the base ones in this module
1792#
1793
1794def get_attributes(Source, names):
1795 d = {}
1796 for name in names:
1797 obj = getattr(Source, name)
1798 if type(obj) == type(get_attributes):
1799 obj = staticmethod(obj)
1800 d[name] = obj
1801 return d
1802
1803def create_test_cases(Mixin, type):
1804 result = {}
1805 glob = globals()
1806 Type = type[0].upper() + type[1:]
1807
1808 for name in glob.keys():
1809 if name.startswith('_Test'):
1810 base = glob[name]
1811 if type in base.ALLOWED_TYPES:
1812 newname = 'With' + Type + name[1:]
1813 class Temp(base, unittest.TestCase, Mixin):
1814 pass
1815 result[newname] = Temp
1816 Temp.__name__ = newname
1817 Temp.__module__ = Mixin.__module__
1818 return result
1819
1820#
1821# Create test cases
1822#
1823
1824class ProcessesMixin(object):
1825 TYPE = 'processes'
1826 Process = multiprocessing.Process
1827 locals().update(get_attributes(multiprocessing, (
1828 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1829 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1830 'RawArray', 'current_process', 'active_children', 'Pipe',
1831 'connection', 'JoinableQueue'
1832 )))
1833
1834testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1835globals().update(testcases_processes)
1836
1837
1838class ManagerMixin(object):
1839 TYPE = 'manager'
1840 Process = multiprocessing.Process
1841 manager = object.__new__(multiprocessing.managers.SyncManager)
1842 locals().update(get_attributes(manager, (
1843 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1844 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1845 'Namespace', 'JoinableQueue'
1846 )))
1847
1848testcases_manager = create_test_cases(ManagerMixin, type='manager')
1849globals().update(testcases_manager)
1850
1851
1852class ThreadsMixin(object):
1853 TYPE = 'threads'
1854 Process = multiprocessing.dummy.Process
1855 locals().update(get_attributes(multiprocessing.dummy, (
1856 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1857 'Condition', 'Event', 'Value', 'Array', 'current_process',
1858 'active_children', 'Pipe', 'connection', 'dict', 'list',
1859 'Namespace', 'JoinableQueue'
1860 )))
1861
1862testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1863globals().update(testcases_threads)
1864
Neal Norwitz0c519b32008-08-25 01:50:24 +00001865class OtherTest(unittest.TestCase):
1866 # TODO: add more tests for deliver/answer challenge.
1867 def test_deliver_challenge_auth_failure(self):
1868 class _FakeConnection(object):
1869 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001870 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001871 def send_bytes(self, data):
1872 pass
1873 self.assertRaises(multiprocessing.AuthenticationError,
1874 multiprocessing.connection.deliver_challenge,
1875 _FakeConnection(), b'abc')
1876
1877 def test_answer_challenge_auth_failure(self):
1878 class _FakeConnection(object):
1879 def __init__(self):
1880 self.count = 0
1881 def recv_bytes(self, size):
1882 self.count += 1
1883 if self.count == 1:
1884 return multiprocessing.connection.CHALLENGE
1885 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001886 return b'something bogus'
1887 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001888 def send_bytes(self, data):
1889 pass
1890 self.assertRaises(multiprocessing.AuthenticationError,
1891 multiprocessing.connection.answer_challenge,
1892 _FakeConnection(), b'abc')
1893
Jesse Noller7152f6d2009-04-02 05:17:26 +00001894#
1895# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1896#
1897
1898def initializer(ns):
1899 ns.test += 1
1900
1901class TestInitializers(unittest.TestCase):
1902 def setUp(self):
1903 self.mgr = multiprocessing.Manager()
1904 self.ns = self.mgr.Namespace()
1905 self.ns.test = 0
1906
1907 def tearDown(self):
1908 self.mgr.shutdown()
1909
1910 def test_manager_initializer(self):
1911 m = multiprocessing.managers.SyncManager()
1912 self.assertRaises(TypeError, m.start, 1)
1913 m.start(initializer, (self.ns,))
1914 self.assertEqual(self.ns.test, 1)
1915 m.shutdown()
1916
1917 def test_pool_initializer(self):
1918 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1919 p = multiprocessing.Pool(1, initializer, (self.ns,))
1920 p.close()
1921 p.join()
1922 self.assertEqual(self.ns.test, 1)
1923
Jesse Noller1b90efb2009-06-30 17:11:52 +00001924#
1925# Issue 5155, 5313, 5331: Test process in processes
1926# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1927#
1928
1929def _ThisSubProcess(q):
1930 try:
1931 item = q.get(block=False)
1932 except Queue.Empty:
1933 pass
1934
1935def _TestProcess(q):
1936 queue = multiprocessing.Queue()
1937 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1938 subProc.start()
1939 subProc.join()
1940
1941def _afunc(x):
1942 return x*x
1943
1944def pool_in_process():
1945 pool = multiprocessing.Pool(processes=4)
1946 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1947
1948class _file_like(object):
1949 def __init__(self, delegate):
1950 self._delegate = delegate
1951 self._pid = None
1952
1953 @property
1954 def cache(self):
1955 pid = os.getpid()
1956 # There are no race conditions since fork keeps only the running thread
1957 if pid != self._pid:
1958 self._pid = pid
1959 self._cache = []
1960 return self._cache
1961
1962 def write(self, data):
1963 self.cache.append(data)
1964
1965 def flush(self):
1966 self._delegate.write(''.join(self.cache))
1967 self._cache = []
1968
1969class TestStdinBadfiledescriptor(unittest.TestCase):
1970
1971 def test_queue_in_process(self):
1972 queue = multiprocessing.Queue()
1973 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1974 proc.start()
1975 proc.join()
1976
1977 def test_pool_in_process(self):
1978 p = multiprocessing.Process(target=pool_in_process)
1979 p.start()
1980 p.join()
1981
1982 def test_flushing(self):
1983 sio = StringIO()
1984 flike = _file_like(sio)
1985 flike.write('foo')
1986 proc = multiprocessing.Process(target=lambda: flike.flush())
1987 flike.flush()
1988 assert sio.getvalue() == 'foo'
1989
1990testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1991 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001992
Benjamin Petersondfd79492008-06-13 19:13:39 +00001993#
1994#
1995#
1996
1997def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001998 if sys.platform.startswith("linux"):
1999 try:
2000 lock = multiprocessing.RLock()
2001 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002002 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002003
Benjamin Petersondfd79492008-06-13 19:13:39 +00002004 if run is None:
2005 from test.test_support import run_unittest as run
2006
2007 util.get_temp_dir() # creates temp directory for use by all processes
2008
2009 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2010
Jesse Noller146b7ab2008-07-02 16:44:09 +00002011 ProcessesMixin.pool = multiprocessing.Pool(4)
2012 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2013 ManagerMixin.manager.__init__()
2014 ManagerMixin.manager.start()
2015 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002016
2017 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002018 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2019 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002020 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2021 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002022 )
2023
2024 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2025 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Florent Xicluna07627882010-03-21 01:14:24 +00002026 with test_support.check_py3k_warnings(
2027 (".+__(get|set)slice__ has been removed", DeprecationWarning)):
2028 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002029
Jesse Noller146b7ab2008-07-02 16:44:09 +00002030 ThreadsMixin.pool.terminate()
2031 ProcessesMixin.pool.terminate()
2032 ManagerMixin.pool.terminate()
2033 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002034
Jesse Noller146b7ab2008-07-02 16:44:09 +00002035 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002036
2037def main():
2038 test_main(unittest.TextTestRunner(verbosity=2).run)
2039
2040if __name__ == '__main__':
2041 main()