blob: 24366a70698b02dcf470cfbf127c7f471f6ec88b [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000020from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000021from StringIO import StringIO
Benjamin Petersondfd79492008-06-13 19:13:39 +000022
Jesse Noller37040cd2008-09-30 00:15:45 +000023
R. David Murray3db8a342009-03-30 23:05:48 +000024_multiprocessing = test_support.import_module('_multiprocessing')
25
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Petersone79edf52008-07-13 18:34:58 +000041latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000048#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000049
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersondfd79492008-06-13 19:13:39 +000065#
66# Creates a wrapper for a function which records the time it takes to finish
67#
68
69class TimingWrapper(object):
70
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
74
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
81
82#
83# Base class for test cases
84#
85
86class BaseTestCase(object):
87
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
93
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
101
102#
103# Return the value of a semaphore
104#
105
106def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
117
118#
119# Testcases
120#
121
122class _TestProcess(BaseTestCase):
123
124 ALLOWED_TYPES = ('processes', 'threads')
125
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
129
130 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000131 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000132
133 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000134 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000135 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000136 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000139
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000144 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000145 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000146 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000147 q.put(current.pid)
148
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
157 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000158 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 current = self.current_process()
160
161 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000162 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000163 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000164 self.assertEquals(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000165 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000167 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000168
169 p.start()
170
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000172 self.assertEquals(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000173 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000174
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000177 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000178 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000179 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000180 self.assertEquals(q.get(), p.pid)
181
182 p.join()
183
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000184 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185 self.assertEquals(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000186 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000187
188 def _test_terminate(self):
189 time.sleep(1000)
190
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
194
195 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000196 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000197 p.start()
198
199 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000200 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202
203 p.terminate()
204
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
209 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000210 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000211
212 p.join()
213
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
224
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
227
228 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000229 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000230
231 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000232 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000233
234 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000235 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
244 )
245 p.start()
246 p.join()
247
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
251
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
256
257 expected = [
258 [],
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
265 ]
266 self.assertEqual(result, expected)
267
268#
269#
270#
271
272class _UpperCaser(multiprocessing.Process):
273
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
277
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
283
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
288
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
293
294class _TestSubclassingProcess(BaseTestCase):
295
296 ALLOWED_TYPES = ('processes',)
297
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
305
306#
307#
308#
309
310def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
315
316def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
321
322
323class _TestQueue(BaseTestCase):
324
325
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
331
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
337
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
341 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000342 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000343 proc.start()
344
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
347
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
354
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
357
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
360
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
363
364 self.assertRaises(Queue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
366
367 self.assertRaises(Queue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(Queue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
373 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
376 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
378
379 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
382 child_can_start.set()
383 parent_can_continue.wait()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 proc.join()
389
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000392 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
398
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
403
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
407 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000408 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 proc.start()
410
411 self.assertEqual(queue_empty(queue), True)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
418
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(Queue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(Queue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(Queue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(Queue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in xrange(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in xrange(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
Jesse Noller82eb5902009-03-30 23:29:31 +0000550 def test_lock_context(self):
551 with self.Lock():
552 pass
553
Benjamin Petersondfd79492008-06-13 19:13:39 +0000554
555class _TestSemaphore(BaseTestCase):
556
557 def _test_semaphore(self, sem):
558 self.assertReturnsIfImplemented(2, get_value, sem)
559 self.assertEqual(sem.acquire(), True)
560 self.assertReturnsIfImplemented(1, get_value, sem)
561 self.assertEqual(sem.acquire(), True)
562 self.assertReturnsIfImplemented(0, get_value, sem)
563 self.assertEqual(sem.acquire(False), False)
564 self.assertReturnsIfImplemented(0, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(1, get_value, sem)
567 self.assertEqual(sem.release(), None)
568 self.assertReturnsIfImplemented(2, get_value, sem)
569
570 def test_semaphore(self):
571 sem = self.Semaphore(2)
572 self._test_semaphore(sem)
573 self.assertEqual(sem.release(), None)
574 self.assertReturnsIfImplemented(3, get_value, sem)
575 self.assertEqual(sem.release(), None)
576 self.assertReturnsIfImplemented(4, get_value, sem)
577
578 def test_bounded_semaphore(self):
579 sem = self.BoundedSemaphore(2)
580 self._test_semaphore(sem)
581 # Currently fails on OS/X
582 #if HAVE_GETVALUE:
583 # self.assertRaises(ValueError, sem.release)
584 # self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_timeout(self):
587 if self.TYPE != 'processes':
588 return
589
590 sem = self.Semaphore(0)
591 acquire = TimingWrapper(sem.acquire)
592
593 self.assertEqual(acquire(False), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
595
596 self.assertEqual(acquire(False, None), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
598
599 self.assertEqual(acquire(False, TIMEOUT1), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, 0)
601
602 self.assertEqual(acquire(True, TIMEOUT2), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
604
605 self.assertEqual(acquire(timeout=TIMEOUT3), False)
606 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
607
608
609class _TestCondition(BaseTestCase):
610
611 def f(self, cond, sleeping, woken, timeout=None):
612 cond.acquire()
613 sleeping.release()
614 cond.wait(timeout)
615 woken.release()
616 cond.release()
617
618 def check_invariant(self, cond):
619 # this is only supposed to succeed when there are no sleepers
620 if self.TYPE == 'processes':
621 try:
622 sleepers = (cond._sleeping_count.get_value() -
623 cond._woken_count.get_value())
624 self.assertEqual(sleepers, 0)
625 self.assertEqual(cond._wait_semaphore.get_value(), 0)
626 except NotImplementedError:
627 pass
628
629 def test_notify(self):
630 cond = self.Condition()
631 sleeping = self.Semaphore(0)
632 woken = self.Semaphore(0)
633
634 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000635 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000636 p.start()
637
638 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000639 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000640 p.start()
641
642 # wait for both children to start sleeping
643 sleeping.acquire()
644 sleeping.acquire()
645
646 # check no process/thread has woken up
647 time.sleep(DELTA)
648 self.assertReturnsIfImplemented(0, get_value, woken)
649
650 # wake up one process/thread
651 cond.acquire()
652 cond.notify()
653 cond.release()
654
655 # check one process/thread has woken up
656 time.sleep(DELTA)
657 self.assertReturnsIfImplemented(1, get_value, woken)
658
659 # wake up another
660 cond.acquire()
661 cond.notify()
662 cond.release()
663
664 # check other has woken up
665 time.sleep(DELTA)
666 self.assertReturnsIfImplemented(2, get_value, woken)
667
668 # check state is not mucked up
669 self.check_invariant(cond)
670 p.join()
671
672 def test_notify_all(self):
673 cond = self.Condition()
674 sleeping = self.Semaphore(0)
675 woken = self.Semaphore(0)
676
677 # start some threads/processes which will timeout
678 for i in range(3):
679 p = self.Process(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000681 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 p.start()
683
684 t = threading.Thread(target=self.f,
685 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000686 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000687 t.start()
688
689 # wait for them all to sleep
690 for i in xrange(6):
691 sleeping.acquire()
692
693 # check they have all timed out
694 for i in xrange(6):
695 woken.acquire()
696 self.assertReturnsIfImplemented(0, get_value, woken)
697
698 # check state is not mucked up
699 self.check_invariant(cond)
700
701 # start some more threads/processes
702 for i in range(3):
703 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000704 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000705 p.start()
706
707 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000708 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000709 t.start()
710
711 # wait for them to all sleep
712 for i in xrange(6):
713 sleeping.acquire()
714
715 # check no process/thread has woken up
716 time.sleep(DELTA)
717 self.assertReturnsIfImplemented(0, get_value, woken)
718
719 # wake them all up
720 cond.acquire()
721 cond.notify_all()
722 cond.release()
723
724 # check they have all woken
725 time.sleep(DELTA)
726 self.assertReturnsIfImplemented(6, get_value, woken)
727
728 # check state is not mucked up
729 self.check_invariant(cond)
730
731 def test_timeout(self):
732 cond = self.Condition()
733 wait = TimingWrapper(cond.wait)
734 cond.acquire()
735 res = wait(TIMEOUT1)
736 cond.release()
737 self.assertEqual(res, None)
738 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
739
740
741class _TestEvent(BaseTestCase):
742
743 def _test_event(self, event):
744 time.sleep(TIMEOUT2)
745 event.set()
746
747 def test_event(self):
748 event = self.Event()
749 wait = TimingWrapper(event.wait)
750
751 # Removed temporaily, due to API shear, this does not
752 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000753 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000754
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000755 # Removed, threading.Event.wait() will return the value of the __flag
756 # instead of None. API Shear with the semaphore backed mp.Event
757 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000758 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000759 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000760 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
761
762 event.set()
763
764 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000765 self.assertEqual(event.is_set(), True)
766 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000767 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000768 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000769 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
770 # self.assertEqual(event.is_set(), True)
771
772 event.clear()
773
774 #self.assertEqual(event.is_set(), False)
775
776 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000777 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000778
779#
780#
781#
782
783class _TestValue(BaseTestCase):
784
785 codes_values = [
786 ('i', 4343, 24234),
787 ('d', 3.625, -4.25),
788 ('h', -232, 234),
789 ('c', latin('x'), latin('y'))
790 ]
791
792 def _test(self, values):
793 for sv, cv in zip(values, self.codes_values):
794 sv.value = cv[2]
795
796
797 def test_value(self, raw=False):
798 if self.TYPE != 'processes':
799 return
800
801 if raw:
802 values = [self.RawValue(code, value)
803 for code, value, _ in self.codes_values]
804 else:
805 values = [self.Value(code, value)
806 for code, value, _ in self.codes_values]
807
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[1])
810
811 proc = self.Process(target=self._test, args=(values,))
812 proc.start()
813 proc.join()
814
815 for sv, cv in zip(values, self.codes_values):
816 self.assertEqual(sv.value, cv[2])
817
818 def test_rawvalue(self):
819 self.test_value(raw=True)
820
821 def test_getobj_getlock(self):
822 if self.TYPE != 'processes':
823 return
824
825 val1 = self.Value('i', 5)
826 lock1 = val1.get_lock()
827 obj1 = val1.get_obj()
828
829 val2 = self.Value('i', 5, lock=None)
830 lock2 = val2.get_lock()
831 obj2 = val2.get_obj()
832
833 lock = self.Lock()
834 val3 = self.Value('i', 5, lock=lock)
835 lock3 = val3.get_lock()
836 obj3 = val3.get_obj()
837 self.assertEqual(lock, lock3)
838
Jesse Noller6ab22152009-01-18 02:45:38 +0000839 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000840 self.assertFalse(hasattr(arr4, 'get_lock'))
841 self.assertFalse(hasattr(arr4, 'get_obj'))
842
Jesse Noller6ab22152009-01-18 02:45:38 +0000843 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
844
845 arr5 = self.RawValue('i', 5)
846 self.assertFalse(hasattr(arr5, 'get_lock'))
847 self.assertFalse(hasattr(arr5, 'get_obj'))
848
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849
850class _TestArray(BaseTestCase):
851
852 def f(self, seq):
853 for i in range(1, len(seq)):
854 seq[i] += seq[i-1]
855
856 def test_array(self, raw=False):
857 if self.TYPE != 'processes':
858 return
859
860 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
861 if raw:
862 arr = self.RawArray('i', seq)
863 else:
864 arr = self.Array('i', seq)
865
866 self.assertEqual(len(arr), len(seq))
867 self.assertEqual(arr[3], seq[3])
868 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
869
870 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
871
872 self.assertEqual(list(arr[:]), seq)
873
874 self.f(seq)
875
876 p = self.Process(target=self.f, args=(arr,))
877 p.start()
878 p.join()
879
880 self.assertEqual(list(arr[:]), seq)
881
882 def test_rawarray(self):
883 self.test_array(raw=True)
884
885 def test_getobj_getlock_obj(self):
886 if self.TYPE != 'processes':
887 return
888
889 arr1 = self.Array('i', range(10))
890 lock1 = arr1.get_lock()
891 obj1 = arr1.get_obj()
892
893 arr2 = self.Array('i', range(10), lock=None)
894 lock2 = arr2.get_lock()
895 obj2 = arr2.get_obj()
896
897 lock = self.Lock()
898 arr3 = self.Array('i', range(10), lock=lock)
899 lock3 = arr3.get_lock()
900 obj3 = arr3.get_obj()
901 self.assertEqual(lock, lock3)
902
Jesse Noller6ab22152009-01-18 02:45:38 +0000903 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000904 self.assertFalse(hasattr(arr4, 'get_lock'))
905 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000906 self.assertRaises(AttributeError,
907 self.Array, 'i', range(10), lock='notalock')
908
909 arr5 = self.RawArray('i', range(10))
910 self.assertFalse(hasattr(arr5, 'get_lock'))
911 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912
913#
914#
915#
916
917class _TestContainers(BaseTestCase):
918
919 ALLOWED_TYPES = ('manager',)
920
921 def test_list(self):
922 a = self.list(range(10))
923 self.assertEqual(a[:], range(10))
924
925 b = self.list()
926 self.assertEqual(b[:], [])
927
928 b.extend(range(5))
929 self.assertEqual(b[:], range(5))
930
931 self.assertEqual(b[2], 2)
932 self.assertEqual(b[2:10], [2,3,4])
933
934 b *= 2
935 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
936
937 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
938
939 self.assertEqual(a[:], range(10))
940
941 d = [a, b]
942 e = self.list(d)
943 self.assertEqual(
944 e[:],
945 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
946 )
947
948 f = self.list([a])
949 a.append('hello')
950 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
951
952 def test_dict(self):
953 d = self.dict()
954 indices = range(65, 70)
955 for i in indices:
956 d[i] = chr(i)
957 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
958 self.assertEqual(sorted(d.keys()), indices)
959 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
960 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
961
962 def test_namespace(self):
963 n = self.Namespace()
964 n.name = 'Bob'
965 n.job = 'Builder'
966 n._hidden = 'hidden'
967 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
968 del n.job
969 self.assertEqual(str(n), "Namespace(name='Bob')")
970 self.assertTrue(hasattr(n, 'name'))
971 self.assertTrue(not hasattr(n, 'job'))
972
973#
974#
975#
976
977def sqr(x, wait=0.0):
978 time.sleep(wait)
979 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000980class _TestPool(BaseTestCase):
981
982 def test_apply(self):
983 papply = self.pool.apply
984 self.assertEqual(papply(sqr, (5,)), sqr(5))
985 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
986
987 def test_map(self):
988 pmap = self.pool.map
989 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
990 self.assertEqual(pmap(sqr, range(100), chunksize=20),
991 map(sqr, range(100)))
992
Jesse Noller7530e472009-07-16 14:23:04 +0000993 def test_map_chunksize(self):
994 try:
995 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
996 except multiprocessing.TimeoutError:
997 self.fail("pool.map_async with chunksize stalled on null list")
998
Benjamin Petersondfd79492008-06-13 19:13:39 +0000999 def test_async(self):
1000 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1001 get = TimingWrapper(res.get)
1002 self.assertEqual(get(), 49)
1003 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1004
1005 def test_async_timeout(self):
1006 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1007 get = TimingWrapper(res.get)
1008 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1009 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1010
1011 def test_imap(self):
1012 it = self.pool.imap(sqr, range(10))
1013 self.assertEqual(list(it), map(sqr, range(10)))
1014
1015 it = self.pool.imap(sqr, range(10))
1016 for i in range(10):
1017 self.assertEqual(it.next(), i*i)
1018 self.assertRaises(StopIteration, it.next)
1019
1020 it = self.pool.imap(sqr, range(1000), chunksize=100)
1021 for i in range(1000):
1022 self.assertEqual(it.next(), i*i)
1023 self.assertRaises(StopIteration, it.next)
1024
1025 def test_imap_unordered(self):
1026 it = self.pool.imap_unordered(sqr, range(1000))
1027 self.assertEqual(sorted(it), map(sqr, range(1000)))
1028
1029 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1030 self.assertEqual(sorted(it), map(sqr, range(1000)))
1031
1032 def test_make_pool(self):
1033 p = multiprocessing.Pool(3)
1034 self.assertEqual(3, len(p._pool))
1035 p.close()
1036 p.join()
1037
1038 def test_terminate(self):
1039 if self.TYPE == 'manager':
1040 # On Unix a forked process increfs each shared object to
1041 # which its parent process held a reference. If the
1042 # forked process gets terminated then there is likely to
1043 # be a reference leak. So to prevent
1044 # _TestZZZNumberOfObjects from failing we skip this test
1045 # when using a manager.
1046 return
1047
1048 result = self.pool.map_async(
1049 time.sleep, [0.1 for i in range(10000)], chunksize=1
1050 )
1051 self.pool.terminate()
1052 join = TimingWrapper(self.pool.join)
1053 join()
1054 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001055
1056class _TestPoolWorkerLifetime(BaseTestCase):
1057
1058 ALLOWED_TYPES = ('processes', )
1059 def test_pool_worker_lifetime(self):
1060 p = multiprocessing.Pool(3, maxtasksperchild=10)
1061 self.assertEqual(3, len(p._pool))
1062 origworkerpids = [w.pid for w in p._pool]
1063 # Run many tasks so each worker gets replaced (hopefully)
1064 results = []
1065 for i in range(100):
1066 results.append(p.apply_async(sqr, (i, )))
1067 # Fetch the results and verify we got the right answers,
1068 # also ensuring all the tasks have completed.
1069 for (j, res) in enumerate(results):
1070 self.assertEqual(res.get(), sqr(j))
1071 # Refill the pool
1072 p._repopulate_pool()
1073 # Finally, check that the worker pids have changed
1074 finalworkerpids = [w.pid for w in p._pool]
1075 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1076 p.close()
1077 p.join()
1078
Benjamin Petersondfd79492008-06-13 19:13:39 +00001079#
1080# Test that manager has expected number of shared objects left
1081#
1082
1083class _TestZZZNumberOfObjects(BaseTestCase):
1084 # Because test cases are sorted alphabetically, this one will get
1085 # run after all the other tests for the manager. It tests that
1086 # there have been no "reference leaks" for the manager's shared
1087 # objects. Note the comment in _TestPool.test_terminate().
1088 ALLOWED_TYPES = ('manager',)
1089
1090 def test_number_of_objects(self):
1091 EXPECTED_NUMBER = 1 # the pool object is still alive
1092 multiprocessing.active_children() # discard dead process objs
1093 gc.collect() # do garbage collection
1094 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001095 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001096 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001097 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001098 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001099
1100 self.assertEqual(refs, EXPECTED_NUMBER)
1101
1102#
1103# Test of creating a customized manager class
1104#
1105
1106from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1107
1108class FooBar(object):
1109 def f(self):
1110 return 'f()'
1111 def g(self):
1112 raise ValueError
1113 def _h(self):
1114 return '_h()'
1115
1116def baz():
1117 for i in xrange(10):
1118 yield i*i
1119
1120class IteratorProxy(BaseProxy):
1121 _exposed_ = ('next', '__next__')
1122 def __iter__(self):
1123 return self
1124 def next(self):
1125 return self._callmethod('next')
1126 def __next__(self):
1127 return self._callmethod('__next__')
1128
1129class MyManager(BaseManager):
1130 pass
1131
1132MyManager.register('Foo', callable=FooBar)
1133MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1134MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1135
1136
1137class _TestMyManager(BaseTestCase):
1138
1139 ALLOWED_TYPES = ('manager',)
1140
1141 def test_mymanager(self):
1142 manager = MyManager()
1143 manager.start()
1144
1145 foo = manager.Foo()
1146 bar = manager.Bar()
1147 baz = manager.baz()
1148
1149 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1150 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1151
1152 self.assertEqual(foo_methods, ['f', 'g'])
1153 self.assertEqual(bar_methods, ['f', '_h'])
1154
1155 self.assertEqual(foo.f(), 'f()')
1156 self.assertRaises(ValueError, foo.g)
1157 self.assertEqual(foo._callmethod('f'), 'f()')
1158 self.assertRaises(RemoteError, foo._callmethod, '_h')
1159
1160 self.assertEqual(bar.f(), 'f()')
1161 self.assertEqual(bar._h(), '_h()')
1162 self.assertEqual(bar._callmethod('f'), 'f()')
1163 self.assertEqual(bar._callmethod('_h'), '_h()')
1164
1165 self.assertEqual(list(baz), [i*i for i in range(10)])
1166
1167 manager.shutdown()
1168
1169#
1170# Test of connecting to a remote server and using xmlrpclib for serialization
1171#
1172
1173_queue = Queue.Queue()
1174def get_queue():
1175 return _queue
1176
1177class QueueManager(BaseManager):
1178 '''manager class used by server process'''
1179QueueManager.register('get_queue', callable=get_queue)
1180
1181class QueueManager2(BaseManager):
1182 '''manager class which specifies the same interface as QueueManager'''
1183QueueManager2.register('get_queue')
1184
1185
1186SERIALIZER = 'xmlrpclib'
1187
1188class _TestRemoteManager(BaseTestCase):
1189
1190 ALLOWED_TYPES = ('manager',)
1191
1192 def _putter(self, address, authkey):
1193 manager = QueueManager2(
1194 address=address, authkey=authkey, serializer=SERIALIZER
1195 )
1196 manager.connect()
1197 queue = manager.get_queue()
1198 queue.put(('hello world', None, True, 2.25))
1199
1200 def test_remote(self):
1201 authkey = os.urandom(32)
1202
1203 manager = QueueManager(
1204 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1205 )
1206 manager.start()
1207
1208 p = self.Process(target=self._putter, args=(manager.address, authkey))
1209 p.start()
1210
1211 manager2 = QueueManager2(
1212 address=manager.address, authkey=authkey, serializer=SERIALIZER
1213 )
1214 manager2.connect()
1215 queue = manager2.get_queue()
1216
1217 # Note that xmlrpclib will deserialize object as a list not a tuple
1218 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1219
1220 # Because we are using xmlrpclib for serialization instead of
1221 # pickle this will cause a serialization error.
1222 self.assertRaises(Exception, queue.put, time.sleep)
1223
1224 # Make queue finalizer run before the server is stopped
1225 del queue
1226 manager.shutdown()
1227
Jesse Noller459a6482009-03-30 15:50:42 +00001228class _TestManagerRestart(BaseTestCase):
1229
1230 def _putter(self, address, authkey):
1231 manager = QueueManager(
1232 address=address, authkey=authkey, serializer=SERIALIZER)
1233 manager.connect()
1234 queue = manager.get_queue()
1235 queue.put('hello world')
1236
1237 def test_rapid_restart(self):
1238 authkey = os.urandom(32)
R. David Murrayc7298ff2009-12-14 21:57:39 +00001239 port = test_support.find_unused_port()
Jesse Noller459a6482009-03-30 15:50:42 +00001240 manager = QueueManager(
R. David Murrayc7298ff2009-12-14 21:57:39 +00001241 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001242 manager.start()
1243
1244 p = self.Process(target=self._putter, args=(manager.address, authkey))
1245 p.start()
1246 queue = manager.get_queue()
1247 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001248 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001249 manager.shutdown()
1250 manager = QueueManager(
R. David Murrayc7298ff2009-12-14 21:57:39 +00001251 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001252 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001253 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001254
Benjamin Petersondfd79492008-06-13 19:13:39 +00001255#
1256#
1257#
1258
1259SENTINEL = latin('')
1260
1261class _TestConnection(BaseTestCase):
1262
1263 ALLOWED_TYPES = ('processes', 'threads')
1264
1265 def _echo(self, conn):
1266 for msg in iter(conn.recv_bytes, SENTINEL):
1267 conn.send_bytes(msg)
1268 conn.close()
1269
1270 def test_connection(self):
1271 conn, child_conn = self.Pipe()
1272
1273 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001274 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001275 p.start()
1276
1277 seq = [1, 2.25, None]
1278 msg = latin('hello world')
1279 longmsg = msg * 10
1280 arr = array.array('i', range(4))
1281
1282 if self.TYPE == 'processes':
1283 self.assertEqual(type(conn.fileno()), int)
1284
1285 self.assertEqual(conn.send(seq), None)
1286 self.assertEqual(conn.recv(), seq)
1287
1288 self.assertEqual(conn.send_bytes(msg), None)
1289 self.assertEqual(conn.recv_bytes(), msg)
1290
1291 if self.TYPE == 'processes':
1292 buffer = array.array('i', [0]*10)
1293 expected = list(arr) + [0] * (10 - len(arr))
1294 self.assertEqual(conn.send_bytes(arr), None)
1295 self.assertEqual(conn.recv_bytes_into(buffer),
1296 len(arr) * buffer.itemsize)
1297 self.assertEqual(list(buffer), expected)
1298
1299 buffer = array.array('i', [0]*10)
1300 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1301 self.assertEqual(conn.send_bytes(arr), None)
1302 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1303 len(arr) * buffer.itemsize)
1304 self.assertEqual(list(buffer), expected)
1305
1306 buffer = bytearray(latin(' ' * 40))
1307 self.assertEqual(conn.send_bytes(longmsg), None)
1308 try:
1309 res = conn.recv_bytes_into(buffer)
1310 except multiprocessing.BufferTooShort, e:
1311 self.assertEqual(e.args, (longmsg,))
1312 else:
1313 self.fail('expected BufferTooShort, got %s' % res)
1314
1315 poll = TimingWrapper(conn.poll)
1316
1317 self.assertEqual(poll(), False)
1318 self.assertTimingAlmostEqual(poll.elapsed, 0)
1319
1320 self.assertEqual(poll(TIMEOUT1), False)
1321 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1322
1323 conn.send(None)
1324
1325 self.assertEqual(poll(TIMEOUT1), True)
1326 self.assertTimingAlmostEqual(poll.elapsed, 0)
1327
1328 self.assertEqual(conn.recv(), None)
1329
1330 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1331 conn.send_bytes(really_big_msg)
1332 self.assertEqual(conn.recv_bytes(), really_big_msg)
1333
1334 conn.send_bytes(SENTINEL) # tell child to quit
1335 child_conn.close()
1336
1337 if self.TYPE == 'processes':
1338 self.assertEqual(conn.readable, True)
1339 self.assertEqual(conn.writable, True)
1340 self.assertRaises(EOFError, conn.recv)
1341 self.assertRaises(EOFError, conn.recv_bytes)
1342
1343 p.join()
1344
1345 def test_duplex_false(self):
1346 reader, writer = self.Pipe(duplex=False)
1347 self.assertEqual(writer.send(1), None)
1348 self.assertEqual(reader.recv(), 1)
1349 if self.TYPE == 'processes':
1350 self.assertEqual(reader.readable, True)
1351 self.assertEqual(reader.writable, False)
1352 self.assertEqual(writer.readable, False)
1353 self.assertEqual(writer.writable, True)
1354 self.assertRaises(IOError, reader.send, 2)
1355 self.assertRaises(IOError, writer.recv)
1356 self.assertRaises(IOError, writer.poll)
1357
1358 def test_spawn_close(self):
1359 # We test that a pipe connection can be closed by parent
1360 # process immediately after child is spawned. On Windows this
1361 # would have sometimes failed on old versions because
1362 # child_conn would be closed before the child got a chance to
1363 # duplicate it.
1364 conn, child_conn = self.Pipe()
1365
1366 p = self.Process(target=self._echo, args=(child_conn,))
1367 p.start()
1368 child_conn.close() # this might complete before child initializes
1369
1370 msg = latin('hello')
1371 conn.send_bytes(msg)
1372 self.assertEqual(conn.recv_bytes(), msg)
1373
1374 conn.send_bytes(SENTINEL)
1375 conn.close()
1376 p.join()
1377
1378 def test_sendbytes(self):
1379 if self.TYPE != 'processes':
1380 return
1381
1382 msg = latin('abcdefghijklmnopqrstuvwxyz')
1383 a, b = self.Pipe()
1384
1385 a.send_bytes(msg)
1386 self.assertEqual(b.recv_bytes(), msg)
1387
1388 a.send_bytes(msg, 5)
1389 self.assertEqual(b.recv_bytes(), msg[5:])
1390
1391 a.send_bytes(msg, 7, 8)
1392 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1393
1394 a.send_bytes(msg, 26)
1395 self.assertEqual(b.recv_bytes(), latin(''))
1396
1397 a.send_bytes(msg, 26, 0)
1398 self.assertEqual(b.recv_bytes(), latin(''))
1399
1400 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1401
1402 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1403
1404 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1405
1406 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1407
1408 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1409
Benjamin Petersondfd79492008-06-13 19:13:39 +00001410class _TestListenerClient(BaseTestCase):
1411
1412 ALLOWED_TYPES = ('processes', 'threads')
1413
1414 def _test(self, address):
1415 conn = self.connection.Client(address)
1416 conn.send('hello')
1417 conn.close()
1418
1419 def test_listener_client(self):
1420 for family in self.connection.families:
1421 l = self.connection.Listener(family=family)
1422 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001423 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001424 p.start()
1425 conn = l.accept()
1426 self.assertEqual(conn.recv(), 'hello')
1427 p.join()
1428 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001429#
1430# Test of sending connection and socket objects between processes
1431#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001432"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001433class _TestPicklingConnections(BaseTestCase):
1434
1435 ALLOWED_TYPES = ('processes',)
1436
1437 def _listener(self, conn, families):
1438 for fam in families:
1439 l = self.connection.Listener(family=fam)
1440 conn.send(l.address)
1441 new_conn = l.accept()
1442 conn.send(new_conn)
1443
1444 if self.TYPE == 'processes':
1445 l = socket.socket()
1446 l.bind(('localhost', 0))
1447 conn.send(l.getsockname())
1448 l.listen(1)
1449 new_conn, addr = l.accept()
1450 conn.send(new_conn)
1451
1452 conn.recv()
1453
1454 def _remote(self, conn):
1455 for (address, msg) in iter(conn.recv, None):
1456 client = self.connection.Client(address)
1457 client.send(msg.upper())
1458 client.close()
1459
1460 if self.TYPE == 'processes':
1461 address, msg = conn.recv()
1462 client = socket.socket()
1463 client.connect(address)
1464 client.sendall(msg.upper())
1465 client.close()
1466
1467 conn.close()
1468
1469 def test_pickling(self):
1470 try:
1471 multiprocessing.allow_connection_pickling()
1472 except ImportError:
1473 return
1474
1475 families = self.connection.families
1476
1477 lconn, lconn0 = self.Pipe()
1478 lp = self.Process(target=self._listener, args=(lconn0, families))
1479 lp.start()
1480 lconn0.close()
1481
1482 rconn, rconn0 = self.Pipe()
1483 rp = self.Process(target=self._remote, args=(rconn0,))
1484 rp.start()
1485 rconn0.close()
1486
1487 for fam in families:
1488 msg = ('This connection uses family %s' % fam).encode('ascii')
1489 address = lconn.recv()
1490 rconn.send((address, msg))
1491 new_conn = lconn.recv()
1492 self.assertEqual(new_conn.recv(), msg.upper())
1493
1494 rconn.send(None)
1495
1496 if self.TYPE == 'processes':
1497 msg = latin('This connection uses a normal socket')
1498 address = lconn.recv()
1499 rconn.send((address, msg))
1500 if hasattr(socket, 'fromfd'):
1501 new_conn = lconn.recv()
1502 self.assertEqual(new_conn.recv(100), msg.upper())
1503 else:
1504 # XXX On Windows with Py2.6 need to backport fromfd()
1505 discard = lconn.recv_bytes()
1506
1507 lconn.send(None)
1508
1509 rconn.close()
1510 lconn.close()
1511
1512 lp.join()
1513 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001514"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001515#
1516#
1517#
1518
1519class _TestHeap(BaseTestCase):
1520
1521 ALLOWED_TYPES = ('processes',)
1522
1523 def test_heap(self):
1524 iterations = 5000
1525 maxblocks = 50
1526 blocks = []
1527
1528 # create and destroy lots of blocks of different sizes
1529 for i in xrange(iterations):
1530 size = int(random.lognormvariate(0, 1) * 1000)
1531 b = multiprocessing.heap.BufferWrapper(size)
1532 blocks.append(b)
1533 if len(blocks) > maxblocks:
1534 i = random.randrange(maxblocks)
1535 del blocks[i]
1536
1537 # get the heap object
1538 heap = multiprocessing.heap.BufferWrapper._heap
1539
1540 # verify the state of the heap
1541 all = []
1542 occupied = 0
1543 for L in heap._len_to_seq.values():
1544 for arena, start, stop in L:
1545 all.append((heap._arenas.index(arena), start, stop,
1546 stop-start, 'free'))
1547 for arena, start, stop in heap._allocated_blocks:
1548 all.append((heap._arenas.index(arena), start, stop,
1549 stop-start, 'occupied'))
1550 occupied += (stop-start)
1551
1552 all.sort()
1553
1554 for i in range(len(all)-1):
1555 (arena, start, stop) = all[i][:3]
1556 (narena, nstart, nstop) = all[i+1][:3]
1557 self.assertTrue((arena != narena and nstart == 0) or
1558 (stop == nstart))
1559
1560#
1561#
1562#
1563
1564try:
1565 from ctypes import Structure, Value, copy, c_int, c_double
1566except ImportError:
1567 Structure = object
1568 c_int = c_double = None
1569
1570class _Foo(Structure):
1571 _fields_ = [
1572 ('x', c_int),
1573 ('y', c_double)
1574 ]
1575
1576class _TestSharedCTypes(BaseTestCase):
1577
1578 ALLOWED_TYPES = ('processes',)
1579
1580 def _double(self, x, y, foo, arr, string):
1581 x.value *= 2
1582 y.value *= 2
1583 foo.x *= 2
1584 foo.y *= 2
1585 string.value *= 2
1586 for i in range(len(arr)):
1587 arr[i] *= 2
1588
1589 def test_sharedctypes(self, lock=False):
1590 if c_int is None:
1591 return
1592
1593 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001594 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001595 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001596 arr = self.Array('d', range(10), lock=lock)
1597 string = self.Array('c', 20, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001598 string.value = 'hello'
1599
1600 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1601 p.start()
1602 p.join()
1603
1604 self.assertEqual(x.value, 14)
1605 self.assertAlmostEqual(y.value, 2.0/3.0)
1606 self.assertEqual(foo.x, 6)
1607 self.assertAlmostEqual(foo.y, 4.0)
1608 for i in range(10):
1609 self.assertAlmostEqual(arr[i], i*2)
1610 self.assertEqual(string.value, latin('hellohello'))
1611
1612 def test_synchronize(self):
1613 self.test_sharedctypes(lock=True)
1614
1615 def test_copy(self):
1616 if c_int is None:
1617 return
1618
1619 foo = _Foo(2, 5.0)
1620 bar = copy(foo)
1621 foo.x = 0
1622 foo.y = 0
1623 self.assertEqual(bar.x, 2)
1624 self.assertAlmostEqual(bar.y, 5.0)
1625
1626#
1627#
1628#
1629
1630class _TestFinalize(BaseTestCase):
1631
1632 ALLOWED_TYPES = ('processes',)
1633
1634 def _test_finalize(self, conn):
1635 class Foo(object):
1636 pass
1637
1638 a = Foo()
1639 util.Finalize(a, conn.send, args=('a',))
1640 del a # triggers callback for a
1641
1642 b = Foo()
1643 close_b = util.Finalize(b, conn.send, args=('b',))
1644 close_b() # triggers callback for b
1645 close_b() # does nothing because callback has already been called
1646 del b # does nothing because callback has already been called
1647
1648 c = Foo()
1649 util.Finalize(c, conn.send, args=('c',))
1650
1651 d10 = Foo()
1652 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1653
1654 d01 = Foo()
1655 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1656 d02 = Foo()
1657 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1658 d03 = Foo()
1659 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1660
1661 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1662
1663 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1664
1665 # call mutliprocessing's cleanup function then exit process without
1666 # garbage collecting locals
1667 util._exit_function()
1668 conn.close()
1669 os._exit(0)
1670
1671 def test_finalize(self):
1672 conn, child_conn = self.Pipe()
1673
1674 p = self.Process(target=self._test_finalize, args=(child_conn,))
1675 p.start()
1676 p.join()
1677
1678 result = [obj for obj in iter(conn.recv, 'STOP')]
1679 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1680
1681#
1682# Test that from ... import * works for each module
1683#
1684
1685class _TestImportStar(BaseTestCase):
1686
1687 ALLOWED_TYPES = ('processes',)
1688
1689 def test_import(self):
1690 modules = (
1691 'multiprocessing', 'multiprocessing.connection',
1692 'multiprocessing.heap', 'multiprocessing.managers',
1693 'multiprocessing.pool', 'multiprocessing.process',
1694 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1695 'multiprocessing.synchronize', 'multiprocessing.util'
1696 )
1697
1698 for name in modules:
1699 __import__(name)
1700 mod = sys.modules[name]
1701
1702 for attr in getattr(mod, '__all__', ()):
1703 self.assertTrue(
1704 hasattr(mod, attr),
1705 '%r does not have attribute %r' % (mod, attr)
1706 )
1707
1708#
1709# Quick test that logging works -- does not test logging output
1710#
1711
1712class _TestLogging(BaseTestCase):
1713
1714 ALLOWED_TYPES = ('processes',)
1715
1716 def test_enable_logging(self):
1717 logger = multiprocessing.get_logger()
1718 logger.setLevel(util.SUBWARNING)
1719 self.assertTrue(logger is not None)
1720 logger.debug('this will not be printed')
1721 logger.info('nor will this')
1722 logger.setLevel(LOG_LEVEL)
1723
1724 def _test_level(self, conn):
1725 logger = multiprocessing.get_logger()
1726 conn.send(logger.getEffectiveLevel())
1727
1728 def test_level(self):
1729 LEVEL1 = 32
1730 LEVEL2 = 37
1731
1732 logger = multiprocessing.get_logger()
1733 root_logger = logging.getLogger()
1734 root_level = root_logger.level
1735
1736 reader, writer = multiprocessing.Pipe(duplex=False)
1737
1738 logger.setLevel(LEVEL1)
1739 self.Process(target=self._test_level, args=(writer,)).start()
1740 self.assertEqual(LEVEL1, reader.recv())
1741
1742 logger.setLevel(logging.NOTSET)
1743 root_logger.setLevel(LEVEL2)
1744 self.Process(target=self._test_level, args=(writer,)).start()
1745 self.assertEqual(LEVEL2, reader.recv())
1746
1747 root_logger.setLevel(root_level)
1748 logger.setLevel(level=LOG_LEVEL)
1749
Jesse Noller814d02d2009-11-21 14:38:23 +00001750
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001751# class _TestLoggingProcessName(BaseTestCase):
1752#
1753# def handle(self, record):
1754# assert record.processName == multiprocessing.current_process().name
1755# self.__handled = True
1756#
1757# def test_logging(self):
1758# handler = logging.Handler()
1759# handler.handle = self.handle
1760# self.__handled = False
1761# # Bypass getLogger() and side-effects
1762# logger = logging.getLoggerClass()(
1763# 'multiprocessing.test.TestLoggingProcessName')
1764# logger.addHandler(handler)
1765# logger.propagate = False
1766#
1767# logger.warn('foo')
1768# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001769
Benjamin Petersondfd79492008-06-13 19:13:39 +00001770#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001771# Test to verify handle verification, see issue 3321
1772#
1773
1774class TestInvalidHandle(unittest.TestCase):
1775
1776 def test_invalid_handles(self):
1777 if WIN32:
1778 return
1779 conn = _multiprocessing.Connection(44977608)
1780 self.assertRaises(IOError, conn.poll)
1781 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1782#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001783# Functions used to create test cases from the base ones in this module
1784#
1785
1786def get_attributes(Source, names):
1787 d = {}
1788 for name in names:
1789 obj = getattr(Source, name)
1790 if type(obj) == type(get_attributes):
1791 obj = staticmethod(obj)
1792 d[name] = obj
1793 return d
1794
1795def create_test_cases(Mixin, type):
1796 result = {}
1797 glob = globals()
1798 Type = type[0].upper() + type[1:]
1799
1800 for name in glob.keys():
1801 if name.startswith('_Test'):
1802 base = glob[name]
1803 if type in base.ALLOWED_TYPES:
1804 newname = 'With' + Type + name[1:]
1805 class Temp(base, unittest.TestCase, Mixin):
1806 pass
1807 result[newname] = Temp
1808 Temp.__name__ = newname
1809 Temp.__module__ = Mixin.__module__
1810 return result
1811
1812#
1813# Create test cases
1814#
1815
1816class ProcessesMixin(object):
1817 TYPE = 'processes'
1818 Process = multiprocessing.Process
1819 locals().update(get_attributes(multiprocessing, (
1820 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1821 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1822 'RawArray', 'current_process', 'active_children', 'Pipe',
1823 'connection', 'JoinableQueue'
1824 )))
1825
1826testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1827globals().update(testcases_processes)
1828
1829
1830class ManagerMixin(object):
1831 TYPE = 'manager'
1832 Process = multiprocessing.Process
1833 manager = object.__new__(multiprocessing.managers.SyncManager)
1834 locals().update(get_attributes(manager, (
1835 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1836 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1837 'Namespace', 'JoinableQueue'
1838 )))
1839
1840testcases_manager = create_test_cases(ManagerMixin, type='manager')
1841globals().update(testcases_manager)
1842
1843
1844class ThreadsMixin(object):
1845 TYPE = 'threads'
1846 Process = multiprocessing.dummy.Process
1847 locals().update(get_attributes(multiprocessing.dummy, (
1848 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1849 'Condition', 'Event', 'Value', 'Array', 'current_process',
1850 'active_children', 'Pipe', 'connection', 'dict', 'list',
1851 'Namespace', 'JoinableQueue'
1852 )))
1853
1854testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1855globals().update(testcases_threads)
1856
Neal Norwitz0c519b32008-08-25 01:50:24 +00001857class OtherTest(unittest.TestCase):
1858 # TODO: add more tests for deliver/answer challenge.
1859 def test_deliver_challenge_auth_failure(self):
1860 class _FakeConnection(object):
1861 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001862 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001863 def send_bytes(self, data):
1864 pass
1865 self.assertRaises(multiprocessing.AuthenticationError,
1866 multiprocessing.connection.deliver_challenge,
1867 _FakeConnection(), b'abc')
1868
1869 def test_answer_challenge_auth_failure(self):
1870 class _FakeConnection(object):
1871 def __init__(self):
1872 self.count = 0
1873 def recv_bytes(self, size):
1874 self.count += 1
1875 if self.count == 1:
1876 return multiprocessing.connection.CHALLENGE
1877 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001878 return b'something bogus'
1879 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001880 def send_bytes(self, data):
1881 pass
1882 self.assertRaises(multiprocessing.AuthenticationError,
1883 multiprocessing.connection.answer_challenge,
1884 _FakeConnection(), b'abc')
1885
Jesse Noller7152f6d2009-04-02 05:17:26 +00001886#
1887# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1888#
1889
1890def initializer(ns):
1891 ns.test += 1
1892
1893class TestInitializers(unittest.TestCase):
1894 def setUp(self):
1895 self.mgr = multiprocessing.Manager()
1896 self.ns = self.mgr.Namespace()
1897 self.ns.test = 0
1898
1899 def tearDown(self):
1900 self.mgr.shutdown()
1901
1902 def test_manager_initializer(self):
1903 m = multiprocessing.managers.SyncManager()
1904 self.assertRaises(TypeError, m.start, 1)
1905 m.start(initializer, (self.ns,))
1906 self.assertEqual(self.ns.test, 1)
1907 m.shutdown()
1908
1909 def test_pool_initializer(self):
1910 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1911 p = multiprocessing.Pool(1, initializer, (self.ns,))
1912 p.close()
1913 p.join()
1914 self.assertEqual(self.ns.test, 1)
1915
Jesse Noller1b90efb2009-06-30 17:11:52 +00001916#
1917# Issue 5155, 5313, 5331: Test process in processes
1918# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1919#
1920
1921def _ThisSubProcess(q):
1922 try:
1923 item = q.get(block=False)
1924 except Queue.Empty:
1925 pass
1926
1927def _TestProcess(q):
1928 queue = multiprocessing.Queue()
1929 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1930 subProc.start()
1931 subProc.join()
1932
1933def _afunc(x):
1934 return x*x
1935
1936def pool_in_process():
1937 pool = multiprocessing.Pool(processes=4)
1938 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1939
1940class _file_like(object):
1941 def __init__(self, delegate):
1942 self._delegate = delegate
1943 self._pid = None
1944
1945 @property
1946 def cache(self):
1947 pid = os.getpid()
1948 # There are no race conditions since fork keeps only the running thread
1949 if pid != self._pid:
1950 self._pid = pid
1951 self._cache = []
1952 return self._cache
1953
1954 def write(self, data):
1955 self.cache.append(data)
1956
1957 def flush(self):
1958 self._delegate.write(''.join(self.cache))
1959 self._cache = []
1960
1961class TestStdinBadfiledescriptor(unittest.TestCase):
1962
1963 def test_queue_in_process(self):
1964 queue = multiprocessing.Queue()
1965 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1966 proc.start()
1967 proc.join()
1968
1969 def test_pool_in_process(self):
1970 p = multiprocessing.Process(target=pool_in_process)
1971 p.start()
1972 p.join()
1973
1974 def test_flushing(self):
1975 sio = StringIO()
1976 flike = _file_like(sio)
1977 flike.write('foo')
1978 proc = multiprocessing.Process(target=lambda: flike.flush())
1979 flike.flush()
1980 assert sio.getvalue() == 'foo'
1981
1982testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1983 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001984
Benjamin Petersondfd79492008-06-13 19:13:39 +00001985#
1986#
1987#
1988
1989def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001990 if sys.platform.startswith("linux"):
1991 try:
1992 lock = multiprocessing.RLock()
1993 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00001994 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001995
Benjamin Petersondfd79492008-06-13 19:13:39 +00001996 if run is None:
1997 from test.test_support import run_unittest as run
1998
1999 util.get_temp_dir() # creates temp directory for use by all processes
2000
2001 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2002
Jesse Noller146b7ab2008-07-02 16:44:09 +00002003 ProcessesMixin.pool = multiprocessing.Pool(4)
2004 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2005 ManagerMixin.manager.__init__()
2006 ManagerMixin.manager.start()
2007 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002008
2009 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002010 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2011 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002012 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2013 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002014 )
2015
2016 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2017 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Senthil Kumarance8e33a2010-01-08 19:04:16 +00002018 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002019
Jesse Noller146b7ab2008-07-02 16:44:09 +00002020 ThreadsMixin.pool.terminate()
2021 ProcessesMixin.pool.terminate()
2022 ManagerMixin.pool.terminate()
2023 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002024
Jesse Noller146b7ab2008-07-02 16:44:09 +00002025 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002026
2027def main():
2028 test_main(unittest.TextTestRunner(verbosity=2).run)
2029
2030if __name__ == '__main__':
2031 main()