blob: 7fcbdc3278ca421959cdb1c74258732860d3ab5c [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
17import copy
18import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000028
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000041def latin(s):
42 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Benjamin Petersone711caf2008-06-11 16:44:04 +000044#
45# Constants
46#
47
48LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000049#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
51DELTA = 0.1
52CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60
61HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
63
Jesse Noller6214edd2009-01-19 16:23:53 +000064WIN32 = (sys.platform == "win32")
65
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Creates a wrapper for a function which records the time it takes to finish
68#
69
70class TimingWrapper(object):
71
72 def __init__(self, func):
73 self.func = func
74 self.elapsed = None
75
76 def __call__(self, *args, **kwds):
77 t = time.time()
78 try:
79 return self.func(*args, **kwds)
80 finally:
81 self.elapsed = time.time() - t
82
83#
84# Base class for test cases
85#
86
87class BaseTestCase(object):
88
89 ALLOWED_TYPES = ('processes', 'manager', 'threads')
90
91 def assertTimingAlmostEqual(self, a, b):
92 if CHECK_TIMINGS:
93 self.assertAlmostEqual(a, b, 1)
94
95 def assertReturnsIfImplemented(self, value, func, *args):
96 try:
97 res = func(*args)
98 except NotImplementedError:
99 pass
100 else:
101 return self.assertEqual(value, res)
102
103#
104# Return the value of a semaphore
105#
106
107def get_value(self):
108 try:
109 return self.get_value()
110 except AttributeError:
111 try:
112 return self._Semaphore__value
113 except AttributeError:
114 try:
115 return self._value
116 except AttributeError:
117 raise NotImplementedError
118
119#
120# Testcases
121#
122
123class _TestProcess(BaseTestCase):
124
125 ALLOWED_TYPES = ('processes', 'threads')
126
127 def test_current(self):
128 if self.TYPE == 'threads':
129 return
130
131 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000132 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000133
134 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000135 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000136 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000137 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000138 self.assertEqual(current.ident, os.getpid())
139 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140
141 def _test(self, q, *args, **kwds):
142 current = self.current_process()
143 q.put(args)
144 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000145 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000147 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148 q.put(current.pid)
149
150 def test_process(self):
151 q = self.Queue(1)
152 e = self.Event()
153 args = (q, 1, 2)
154 kwargs = {'hello':23, 'bye':2.54}
155 name = 'SomeProcess'
156 p = self.Process(
157 target=self._test, args=args, kwargs=kwargs, name=name
158 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000159 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 current = self.current_process()
161
162 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000164 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000165 self.assertEquals(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000166 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000167 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169
170 p.start()
171
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 self.assertEquals(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000174 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175
176 self.assertEquals(q.get(), args[1:])
177 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000180 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000181 self.assertEquals(q.get(), p.pid)
182
183 p.join()
184
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 self.assertEquals(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000187 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000188
189 def _test_terminate(self):
190 time.sleep(1000)
191
192 def test_terminate(self):
193 if self.TYPE == 'threads':
194 return
195
196 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198 p.start()
199
200 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000201 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203
204 p.terminate()
205
206 join = TimingWrapper(p.join)
207 self.assertEqual(join(), None)
208 self.assertTimingAlmostEqual(join.elapsed, 0.0)
209
210 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000211 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212
213 p.join()
214
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 # XXX sometimes get p.exitcode == 0 on Windows ...
216 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217
218 def test_cpu_count(self):
219 try:
220 cpus = multiprocessing.cpu_count()
221 except NotImplementedError:
222 cpus = 1
223 self.assertTrue(type(cpus) is int)
224 self.assertTrue(cpus >= 1)
225
226 def test_active_children(self):
227 self.assertEqual(type(self.active_children()), list)
228
229 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000230 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000233 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
235 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000236 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237
238 def _test_recursion(self, wconn, id):
239 from multiprocessing import forking
240 wconn.send(id)
241 if len(id) < 2:
242 for i in range(2):
243 p = self.Process(
244 target=self._test_recursion, args=(wconn, id+[i])
245 )
246 p.start()
247 p.join()
248
249 def test_recursion(self):
250 rconn, wconn = self.Pipe(duplex=False)
251 self._test_recursion(wconn, [])
252
253 time.sleep(DELTA)
254 result = []
255 while rconn.poll():
256 result.append(rconn.recv())
257
258 expected = [
259 [],
260 [0],
261 [0, 0],
262 [0, 1],
263 [1],
264 [1, 0],
265 [1, 1]
266 ]
267 self.assertEqual(result, expected)
268
269#
270#
271#
272
273class _UpperCaser(multiprocessing.Process):
274
275 def __init__(self):
276 multiprocessing.Process.__init__(self)
277 self.child_conn, self.parent_conn = multiprocessing.Pipe()
278
279 def run(self):
280 self.parent_conn.close()
281 for s in iter(self.child_conn.recv, None):
282 self.child_conn.send(s.upper())
283 self.child_conn.close()
284
285 def submit(self, s):
286 assert type(s) is str
287 self.parent_conn.send(s)
288 return self.parent_conn.recv()
289
290 def stop(self):
291 self.parent_conn.send(None)
292 self.parent_conn.close()
293 self.child_conn.close()
294
295class _TestSubclassingProcess(BaseTestCase):
296
297 ALLOWED_TYPES = ('processes',)
298
299 def test_subclassing(self):
300 uppercaser = _UpperCaser()
301 uppercaser.start()
302 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
303 self.assertEqual(uppercaser.submit('world'), 'WORLD')
304 uppercaser.stop()
305 uppercaser.join()
306
307#
308#
309#
310
311def queue_empty(q):
312 if hasattr(q, 'empty'):
313 return q.empty()
314 else:
315 return q.qsize() == 0
316
317def queue_full(q, maxsize):
318 if hasattr(q, 'full'):
319 return q.full()
320 else:
321 return q.qsize() == maxsize
322
323
324class _TestQueue(BaseTestCase):
325
326
327 def _test_put(self, queue, child_can_start, parent_can_continue):
328 child_can_start.wait()
329 for i in range(6):
330 queue.get()
331 parent_can_continue.set()
332
333 def test_put(self):
334 MAXSIZE = 6
335 queue = self.Queue(maxsize=MAXSIZE)
336 child_can_start = self.Event()
337 parent_can_continue = self.Event()
338
339 proc = self.Process(
340 target=self._test_put,
341 args=(queue, child_can_start, parent_can_continue)
342 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000343 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 proc.start()
345
346 self.assertEqual(queue_empty(queue), True)
347 self.assertEqual(queue_full(queue, MAXSIZE), False)
348
349 queue.put(1)
350 queue.put(2, True)
351 queue.put(3, True, None)
352 queue.put(4, False)
353 queue.put(5, False, None)
354 queue.put_nowait(6)
355
356 # the values may be in buffer but not yet in pipe so sleep a bit
357 time.sleep(DELTA)
358
359 self.assertEqual(queue_empty(queue), False)
360 self.assertEqual(queue_full(queue, MAXSIZE), True)
361
362 put = TimingWrapper(queue.put)
363 put_nowait = TimingWrapper(queue.put_nowait)
364
365 self.assertRaises(pyqueue.Full, put, 7, False)
366 self.assertTimingAlmostEqual(put.elapsed, 0)
367
368 self.assertRaises(pyqueue.Full, put, 7, False, None)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
370
371 self.assertRaises(pyqueue.Full, put_nowait, 7)
372 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
373
374 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
375 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
376
377 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
378 self.assertTimingAlmostEqual(put.elapsed, 0)
379
380 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
381 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
382
383 child_can_start.set()
384 parent_can_continue.wait()
385
386 self.assertEqual(queue_empty(queue), True)
387 self.assertEqual(queue_full(queue, MAXSIZE), False)
388
389 proc.join()
390
391 def _test_get(self, queue, child_can_start, parent_can_continue):
392 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000393 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 queue.put(2)
395 queue.put(3)
396 queue.put(4)
397 queue.put(5)
398 parent_can_continue.set()
399
400 def test_get(self):
401 queue = self.Queue()
402 child_can_start = self.Event()
403 parent_can_continue = self.Event()
404
405 proc = self.Process(
406 target=self._test_get,
407 args=(queue, child_can_start, parent_can_continue)
408 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000409 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000410 proc.start()
411
412 self.assertEqual(queue_empty(queue), True)
413
414 child_can_start.set()
415 parent_can_continue.wait()
416
417 time.sleep(DELTA)
418 self.assertEqual(queue_empty(queue), False)
419
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000420 # Hangs unexpectedly, remove for now
421 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 self.assertEqual(queue.get(True, None), 2)
423 self.assertEqual(queue.get(True), 3)
424 self.assertEqual(queue.get(timeout=1), 4)
425 self.assertEqual(queue.get_nowait(), 5)
426
427 self.assertEqual(queue_empty(queue), True)
428
429 get = TimingWrapper(queue.get)
430 get_nowait = TimingWrapper(queue.get_nowait)
431
432 self.assertRaises(pyqueue.Empty, get, False)
433 self.assertTimingAlmostEqual(get.elapsed, 0)
434
435 self.assertRaises(pyqueue.Empty, get, False, None)
436 self.assertTimingAlmostEqual(get.elapsed, 0)
437
438 self.assertRaises(pyqueue.Empty, get_nowait)
439 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
440
441 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
442 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
443
444 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
445 self.assertTimingAlmostEqual(get.elapsed, 0)
446
447 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
448 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
449
450 proc.join()
451
452 def _test_fork(self, queue):
453 for i in range(10, 20):
454 queue.put(i)
455 # note that at this point the items may only be buffered, so the
456 # process cannot shutdown until the feeder thread has finished
457 # pushing items onto the pipe.
458
459 def test_fork(self):
460 # Old versions of Queue would fail to create a new feeder
461 # thread for a forked process if the original process had its
462 # own feeder thread. This test checks that this no longer
463 # happens.
464
465 queue = self.Queue()
466
467 # put items on queue so that main process starts a feeder thread
468 for i in range(10):
469 queue.put(i)
470
471 # wait to make sure thread starts before we fork a new process
472 time.sleep(DELTA)
473
474 # fork process
475 p = self.Process(target=self._test_fork, args=(queue,))
476 p.start()
477
478 # check that all expected items are in the queue
479 for i in range(20):
480 self.assertEqual(queue.get(), i)
481 self.assertRaises(pyqueue.Empty, queue.get, False)
482
483 p.join()
484
485 def test_qsize(self):
486 q = self.Queue()
487 try:
488 self.assertEqual(q.qsize(), 0)
489 except NotImplementedError:
490 return
491 q.put(1)
492 self.assertEqual(q.qsize(), 1)
493 q.put(5)
494 self.assertEqual(q.qsize(), 2)
495 q.get()
496 self.assertEqual(q.qsize(), 1)
497 q.get()
498 self.assertEqual(q.qsize(), 0)
499
500 def _test_task_done(self, q):
501 for obj in iter(q.get, None):
502 time.sleep(DELTA)
503 q.task_done()
504
505 def test_task_done(self):
506 queue = self.JoinableQueue()
507
508 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
509 return
510
511 workers = [self.Process(target=self._test_task_done, args=(queue,))
512 for i in range(4)]
513
514 for p in workers:
515 p.start()
516
517 for i in range(10):
518 queue.put(i)
519
520 queue.join()
521
522 for p in workers:
523 queue.put(None)
524
525 for p in workers:
526 p.join()
527
528#
529#
530#
531
532class _TestLock(BaseTestCase):
533
534 def test_lock(self):
535 lock = self.Lock()
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.acquire(False), False)
538 self.assertEqual(lock.release(), None)
539 self.assertRaises((ValueError, threading.ThreadError), lock.release)
540
541 def test_rlock(self):
542 lock = self.RLock()
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertRaises((AssertionError, RuntimeError), lock.release)
550
Jesse Nollerf8d00852009-03-31 03:25:07 +0000551 def test_lock_context(self):
552 with self.Lock():
553 pass
554
Benjamin Petersone711caf2008-06-11 16:44:04 +0000555
556class _TestSemaphore(BaseTestCase):
557
558 def _test_semaphore(self, sem):
559 self.assertReturnsIfImplemented(2, get_value, sem)
560 self.assertEqual(sem.acquire(), True)
561 self.assertReturnsIfImplemented(1, get_value, sem)
562 self.assertEqual(sem.acquire(), True)
563 self.assertReturnsIfImplemented(0, get_value, sem)
564 self.assertEqual(sem.acquire(False), False)
565 self.assertReturnsIfImplemented(0, get_value, sem)
566 self.assertEqual(sem.release(), None)
567 self.assertReturnsIfImplemented(1, get_value, sem)
568 self.assertEqual(sem.release(), None)
569 self.assertReturnsIfImplemented(2, get_value, sem)
570
571 def test_semaphore(self):
572 sem = self.Semaphore(2)
573 self._test_semaphore(sem)
574 self.assertEqual(sem.release(), None)
575 self.assertReturnsIfImplemented(3, get_value, sem)
576 self.assertEqual(sem.release(), None)
577 self.assertReturnsIfImplemented(4, get_value, sem)
578
579 def test_bounded_semaphore(self):
580 sem = self.BoundedSemaphore(2)
581 self._test_semaphore(sem)
582 # Currently fails on OS/X
583 #if HAVE_GETVALUE:
584 # self.assertRaises(ValueError, sem.release)
585 # self.assertReturnsIfImplemented(2, get_value, sem)
586
587 def test_timeout(self):
588 if self.TYPE != 'processes':
589 return
590
591 sem = self.Semaphore(0)
592 acquire = TimingWrapper(sem.acquire)
593
594 self.assertEqual(acquire(False), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
596
597 self.assertEqual(acquire(False, None), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
599
600 self.assertEqual(acquire(False, TIMEOUT1), False)
601 self.assertTimingAlmostEqual(acquire.elapsed, 0)
602
603 self.assertEqual(acquire(True, TIMEOUT2), False)
604 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
605
606 self.assertEqual(acquire(timeout=TIMEOUT3), False)
607 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
608
609
610class _TestCondition(BaseTestCase):
611
612 def f(self, cond, sleeping, woken, timeout=None):
613 cond.acquire()
614 sleeping.release()
615 cond.wait(timeout)
616 woken.release()
617 cond.release()
618
619 def check_invariant(self, cond):
620 # this is only supposed to succeed when there are no sleepers
621 if self.TYPE == 'processes':
622 try:
623 sleepers = (cond._sleeping_count.get_value() -
624 cond._woken_count.get_value())
625 self.assertEqual(sleepers, 0)
626 self.assertEqual(cond._wait_semaphore.get_value(), 0)
627 except NotImplementedError:
628 pass
629
630 def test_notify(self):
631 cond = self.Condition()
632 sleeping = self.Semaphore(0)
633 woken = self.Semaphore(0)
634
635 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000636 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637 p.start()
638
639 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000640 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000641 p.start()
642
643 # wait for both children to start sleeping
644 sleeping.acquire()
645 sleeping.acquire()
646
647 # check no process/thread has woken up
648 time.sleep(DELTA)
649 self.assertReturnsIfImplemented(0, get_value, woken)
650
651 # wake up one process/thread
652 cond.acquire()
653 cond.notify()
654 cond.release()
655
656 # check one process/thread has woken up
657 time.sleep(DELTA)
658 self.assertReturnsIfImplemented(1, get_value, woken)
659
660 # wake up another
661 cond.acquire()
662 cond.notify()
663 cond.release()
664
665 # check other has woken up
666 time.sleep(DELTA)
667 self.assertReturnsIfImplemented(2, get_value, woken)
668
669 # check state is not mucked up
670 self.check_invariant(cond)
671 p.join()
672
673 def test_notify_all(self):
674 cond = self.Condition()
675 sleeping = self.Semaphore(0)
676 woken = self.Semaphore(0)
677
678 # start some threads/processes which will timeout
679 for i in range(3):
680 p = self.Process(target=self.f,
681 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000682 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683 p.start()
684
685 t = threading.Thread(target=self.f,
686 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000687 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000688 t.start()
689
690 # wait for them all to sleep
691 for i in range(6):
692 sleeping.acquire()
693
694 # check they have all timed out
695 for i in range(6):
696 woken.acquire()
697 self.assertReturnsIfImplemented(0, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701
702 # start some more threads/processes
703 for i in range(3):
704 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000705 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 p.start()
707
708 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000709 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000710 t.start()
711
712 # wait for them to all sleep
713 for i in range(6):
714 sleeping.acquire()
715
716 # check no process/thread has woken up
717 time.sleep(DELTA)
718 self.assertReturnsIfImplemented(0, get_value, woken)
719
720 # wake them all up
721 cond.acquire()
722 cond.notify_all()
723 cond.release()
724
725 # check they have all woken
726 time.sleep(DELTA)
727 self.assertReturnsIfImplemented(6, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 def test_timeout(self):
733 cond = self.Condition()
734 wait = TimingWrapper(cond.wait)
735 cond.acquire()
736 res = wait(TIMEOUT1)
737 cond.release()
738 self.assertEqual(res, None)
739 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
740
741
742class _TestEvent(BaseTestCase):
743
744 def _test_event(self, event):
745 time.sleep(TIMEOUT2)
746 event.set()
747
748 def test_event(self):
749 event = self.Event()
750 wait = TimingWrapper(event.wait)
751
752 # Removed temporaily, due to API shear, this does not
753 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000754 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000755
Benjamin Peterson965ce872009-04-05 21:24:58 +0000756 # Removed, threading.Event.wait() will return the value of the __flag
757 # instead of None. API Shear with the semaphore backed mp.Event
758 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000759 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000760 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000761 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
762
763 event.set()
764
765 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000766 self.assertEqual(event.is_set(), True)
767 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000768 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000769 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000770 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
771 # self.assertEqual(event.is_set(), True)
772
773 event.clear()
774
775 #self.assertEqual(event.is_set(), False)
776
777 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000778 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779
780#
781#
782#
783
784class _TestValue(BaseTestCase):
785
786 codes_values = [
787 ('i', 4343, 24234),
788 ('d', 3.625, -4.25),
789 ('h', -232, 234),
790 ('c', latin('x'), latin('y'))
791 ]
792
793 def _test(self, values):
794 for sv, cv in zip(values, self.codes_values):
795 sv.value = cv[2]
796
797
798 def test_value(self, raw=False):
799 if self.TYPE != 'processes':
800 return
801
802 if raw:
803 values = [self.RawValue(code, value)
804 for code, value, _ in self.codes_values]
805 else:
806 values = [self.Value(code, value)
807 for code, value, _ in self.codes_values]
808
809 for sv, cv in zip(values, self.codes_values):
810 self.assertEqual(sv.value, cv[1])
811
812 proc = self.Process(target=self._test, args=(values,))
813 proc.start()
814 proc.join()
815
816 for sv, cv in zip(values, self.codes_values):
817 self.assertEqual(sv.value, cv[2])
818
819 def test_rawvalue(self):
820 self.test_value(raw=True)
821
822 def test_getobj_getlock(self):
823 if self.TYPE != 'processes':
824 return
825
826 val1 = self.Value('i', 5)
827 lock1 = val1.get_lock()
828 obj1 = val1.get_obj()
829
830 val2 = self.Value('i', 5, lock=None)
831 lock2 = val2.get_lock()
832 obj2 = val2.get_obj()
833
834 lock = self.Lock()
835 val3 = self.Value('i', 5, lock=lock)
836 lock3 = val3.get_lock()
837 obj3 = val3.get_obj()
838 self.assertEqual(lock, lock3)
839
Jesse Nollerb0516a62009-01-18 03:11:38 +0000840 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841 self.assertFalse(hasattr(arr4, 'get_lock'))
842 self.assertFalse(hasattr(arr4, 'get_obj'))
843
Jesse Nollerb0516a62009-01-18 03:11:38 +0000844 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
845
846 arr5 = self.RawValue('i', 5)
847 self.assertFalse(hasattr(arr5, 'get_lock'))
848 self.assertFalse(hasattr(arr5, 'get_obj'))
849
Benjamin Petersone711caf2008-06-11 16:44:04 +0000850
851class _TestArray(BaseTestCase):
852
853 def f(self, seq):
854 for i in range(1, len(seq)):
855 seq[i] += seq[i-1]
856
857 def test_array(self, raw=False):
858 if self.TYPE != 'processes':
859 return
860
861 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
862 if raw:
863 arr = self.RawArray('i', seq)
864 else:
865 arr = self.Array('i', seq)
866
867 self.assertEqual(len(arr), len(seq))
868 self.assertEqual(arr[3], seq[3])
869 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
870
871 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
872
873 self.assertEqual(list(arr[:]), seq)
874
875 self.f(seq)
876
877 p = self.Process(target=self.f, args=(arr,))
878 p.start()
879 p.join()
880
881 self.assertEqual(list(arr[:]), seq)
882
883 def test_rawarray(self):
884 self.test_array(raw=True)
885
886 def test_getobj_getlock_obj(self):
887 if self.TYPE != 'processes':
888 return
889
890 arr1 = self.Array('i', list(range(10)))
891 lock1 = arr1.get_lock()
892 obj1 = arr1.get_obj()
893
894 arr2 = self.Array('i', list(range(10)), lock=None)
895 lock2 = arr2.get_lock()
896 obj2 = arr2.get_obj()
897
898 lock = self.Lock()
899 arr3 = self.Array('i', list(range(10)), lock=lock)
900 lock3 = arr3.get_lock()
901 obj3 = arr3.get_obj()
902 self.assertEqual(lock, lock3)
903
Jesse Nollerb0516a62009-01-18 03:11:38 +0000904 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905 self.assertFalse(hasattr(arr4, 'get_lock'))
906 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000907 self.assertRaises(AttributeError,
908 self.Array, 'i', range(10), lock='notalock')
909
910 arr5 = self.RawArray('i', range(10))
911 self.assertFalse(hasattr(arr5, 'get_lock'))
912 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000913
914#
915#
916#
917
918class _TestContainers(BaseTestCase):
919
920 ALLOWED_TYPES = ('manager',)
921
922 def test_list(self):
923 a = self.list(list(range(10)))
924 self.assertEqual(a[:], list(range(10)))
925
926 b = self.list()
927 self.assertEqual(b[:], [])
928
929 b.extend(list(range(5)))
930 self.assertEqual(b[:], list(range(5)))
931
932 self.assertEqual(b[2], 2)
933 self.assertEqual(b[2:10], [2,3,4])
934
935 b *= 2
936 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
937
938 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
939
940 self.assertEqual(a[:], list(range(10)))
941
942 d = [a, b]
943 e = self.list(d)
944 self.assertEqual(
945 e[:],
946 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
947 )
948
949 f = self.list([a])
950 a.append('hello')
951 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
952
953 def test_dict(self):
954 d = self.dict()
955 indices = list(range(65, 70))
956 for i in indices:
957 d[i] = chr(i)
958 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
959 self.assertEqual(sorted(d.keys()), indices)
960 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
961 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
962
963 def test_namespace(self):
964 n = self.Namespace()
965 n.name = 'Bob'
966 n.job = 'Builder'
967 n._hidden = 'hidden'
968 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
969 del n.job
970 self.assertEqual(str(n), "Namespace(name='Bob')")
971 self.assertTrue(hasattr(n, 'name'))
972 self.assertTrue(not hasattr(n, 'job'))
973
974#
975#
976#
977
978def sqr(x, wait=0.0):
979 time.sleep(wait)
980 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000981class _TestPool(BaseTestCase):
982
983 def test_apply(self):
984 papply = self.pool.apply
985 self.assertEqual(papply(sqr, (5,)), sqr(5))
986 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
987
988 def test_map(self):
989 pmap = self.pool.map
990 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
991 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
992 list(map(sqr, list(range(100)))))
993
Alexandre Vassalottie52e3782009-07-17 09:18:18 +0000994 def test_map_chunksize(self):
995 try:
996 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
997 except multiprocessing.TimeoutError:
998 self.fail("pool.map_async with chunksize stalled on null list")
999
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000 def test_async(self):
1001 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1002 get = TimingWrapper(res.get)
1003 self.assertEqual(get(), 49)
1004 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1005
1006 def test_async_timeout(self):
1007 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1008 get = TimingWrapper(res.get)
1009 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1010 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1011
1012 def test_imap(self):
1013 it = self.pool.imap(sqr, list(range(10)))
1014 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1015
1016 it = self.pool.imap(sqr, list(range(10)))
1017 for i in range(10):
1018 self.assertEqual(next(it), i*i)
1019 self.assertRaises(StopIteration, it.__next__)
1020
1021 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1022 for i in range(1000):
1023 self.assertEqual(next(it), i*i)
1024 self.assertRaises(StopIteration, it.__next__)
1025
1026 def test_imap_unordered(self):
1027 it = self.pool.imap_unordered(sqr, list(range(1000)))
1028 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1029
1030 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1031 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1032
1033 def test_make_pool(self):
1034 p = multiprocessing.Pool(3)
1035 self.assertEqual(3, len(p._pool))
1036 p.close()
1037 p.join()
1038
1039 def test_terminate(self):
1040 if self.TYPE == 'manager':
1041 # On Unix a forked process increfs each shared object to
1042 # which its parent process held a reference. If the
1043 # forked process gets terminated then there is likely to
1044 # be a reference leak. So to prevent
1045 # _TestZZZNumberOfObjects from failing we skip this test
1046 # when using a manager.
1047 return
1048
1049 result = self.pool.map_async(
1050 time.sleep, [0.1 for i in range(10000)], chunksize=1
1051 )
1052 self.pool.terminate()
1053 join = TimingWrapper(self.pool.join)
1054 join()
1055 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001056
1057class _TestPoolWorkerLifetime(BaseTestCase):
1058
1059 ALLOWED_TYPES = ('processes', )
1060 def test_pool_worker_lifetime(self):
1061 p = multiprocessing.Pool(3, maxtasksperchild=10)
1062 self.assertEqual(3, len(p._pool))
1063 origworkerpids = [w.pid for w in p._pool]
1064 # Run many tasks so each worker gets replaced (hopefully)
1065 results = []
1066 for i in range(100):
1067 results.append(p.apply_async(sqr, (i, )))
1068 # Fetch the results and verify we got the right answers,
1069 # also ensuring all the tasks have completed.
1070 for (j, res) in enumerate(results):
1071 self.assertEqual(res.get(), sqr(j))
1072 # Refill the pool
1073 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001074 # Wait until all workers are alive
1075 countdown = 5
1076 while countdown and not all(w.is_alive() for w in p._pool):
1077 countdown -= 1
1078 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001079 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001080 # All pids should be assigned. See issue #7805.
1081 self.assertNotIn(None, origworkerpids)
1082 self.assertNotIn(None, finalworkerpids)
1083 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001084 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1085 p.close()
1086 p.join()
1087
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088#
1089# Test that manager has expected number of shared objects left
1090#
1091
1092class _TestZZZNumberOfObjects(BaseTestCase):
1093 # Because test cases are sorted alphabetically, this one will get
1094 # run after all the other tests for the manager. It tests that
1095 # there have been no "reference leaks" for the manager's shared
1096 # objects. Note the comment in _TestPool.test_terminate().
1097 ALLOWED_TYPES = ('manager',)
1098
1099 def test_number_of_objects(self):
1100 EXPECTED_NUMBER = 1 # the pool object is still alive
1101 multiprocessing.active_children() # discard dead process objs
1102 gc.collect() # do garbage collection
1103 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001104 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001105 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001106 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001107 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001108
1109 self.assertEqual(refs, EXPECTED_NUMBER)
1110
1111#
1112# Test of creating a customized manager class
1113#
1114
1115from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1116
1117class FooBar(object):
1118 def f(self):
1119 return 'f()'
1120 def g(self):
1121 raise ValueError
1122 def _h(self):
1123 return '_h()'
1124
1125def baz():
1126 for i in range(10):
1127 yield i*i
1128
1129class IteratorProxy(BaseProxy):
1130 _exposed_ = ('next', '__next__')
1131 def __iter__(self):
1132 return self
1133 def __next__(self):
1134 return self._callmethod('next')
1135 def __next__(self):
1136 return self._callmethod('__next__')
1137
1138class MyManager(BaseManager):
1139 pass
1140
1141MyManager.register('Foo', callable=FooBar)
1142MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1143MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1144
1145
1146class _TestMyManager(BaseTestCase):
1147
1148 ALLOWED_TYPES = ('manager',)
1149
1150 def test_mymanager(self):
1151 manager = MyManager()
1152 manager.start()
1153
1154 foo = manager.Foo()
1155 bar = manager.Bar()
1156 baz = manager.baz()
1157
1158 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1159 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1160
1161 self.assertEqual(foo_methods, ['f', 'g'])
1162 self.assertEqual(bar_methods, ['f', '_h'])
1163
1164 self.assertEqual(foo.f(), 'f()')
1165 self.assertRaises(ValueError, foo.g)
1166 self.assertEqual(foo._callmethod('f'), 'f()')
1167 self.assertRaises(RemoteError, foo._callmethod, '_h')
1168
1169 self.assertEqual(bar.f(), 'f()')
1170 self.assertEqual(bar._h(), '_h()')
1171 self.assertEqual(bar._callmethod('f'), 'f()')
1172 self.assertEqual(bar._callmethod('_h'), '_h()')
1173
1174 self.assertEqual(list(baz), [i*i for i in range(10)])
1175
1176 manager.shutdown()
1177
1178#
1179# Test of connecting to a remote server and using xmlrpclib for serialization
1180#
1181
1182_queue = pyqueue.Queue()
1183def get_queue():
1184 return _queue
1185
1186class QueueManager(BaseManager):
1187 '''manager class used by server process'''
1188QueueManager.register('get_queue', callable=get_queue)
1189
1190class QueueManager2(BaseManager):
1191 '''manager class which specifies the same interface as QueueManager'''
1192QueueManager2.register('get_queue')
1193
1194
1195SERIALIZER = 'xmlrpclib'
1196
1197class _TestRemoteManager(BaseTestCase):
1198
1199 ALLOWED_TYPES = ('manager',)
1200
1201 def _putter(self, address, authkey):
1202 manager = QueueManager2(
1203 address=address, authkey=authkey, serializer=SERIALIZER
1204 )
1205 manager.connect()
1206 queue = manager.get_queue()
1207 queue.put(('hello world', None, True, 2.25))
1208
1209 def test_remote(self):
1210 authkey = os.urandom(32)
1211
1212 manager = QueueManager(
1213 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1214 )
1215 manager.start()
1216
1217 p = self.Process(target=self._putter, args=(manager.address, authkey))
1218 p.start()
1219
1220 manager2 = QueueManager2(
1221 address=manager.address, authkey=authkey, serializer=SERIALIZER
1222 )
1223 manager2.connect()
1224 queue = manager2.get_queue()
1225
1226 # Note that xmlrpclib will deserialize object as a list not a tuple
1227 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1228
1229 # Because we are using xmlrpclib for serialization instead of
1230 # pickle this will cause a serialization error.
1231 self.assertRaises(Exception, queue.put, time.sleep)
1232
1233 # Make queue finalizer run before the server is stopped
1234 del queue
1235 manager.shutdown()
1236
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001237class _TestManagerRestart(BaseTestCase):
1238
1239 def _putter(self, address, authkey):
1240 manager = QueueManager(
1241 address=address, authkey=authkey, serializer=SERIALIZER)
1242 manager.connect()
1243 queue = manager.get_queue()
1244 queue.put('hello world')
1245
1246 def test_rapid_restart(self):
1247 authkey = os.urandom(32)
R. David Murray83396232009-12-14 22:45:15 +00001248 port = test.support.find_unused_port()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001249 manager = QueueManager(
R. David Murray83396232009-12-14 22:45:15 +00001250 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001251 manager.start()
1252
1253 p = self.Process(target=self._putter, args=(manager.address, authkey))
1254 p.start()
1255 queue = manager.get_queue()
1256 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001257 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001258 manager.shutdown()
1259 manager = QueueManager(
R. David Murray83396232009-12-14 22:45:15 +00001260 address=('localhost', port), authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001261 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001262 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001263
Benjamin Petersone711caf2008-06-11 16:44:04 +00001264#
1265#
1266#
1267
1268SENTINEL = latin('')
1269
1270class _TestConnection(BaseTestCase):
1271
1272 ALLOWED_TYPES = ('processes', 'threads')
1273
1274 def _echo(self, conn):
1275 for msg in iter(conn.recv_bytes, SENTINEL):
1276 conn.send_bytes(msg)
1277 conn.close()
1278
1279 def test_connection(self):
1280 conn, child_conn = self.Pipe()
1281
1282 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001283 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001284 p.start()
1285
1286 seq = [1, 2.25, None]
1287 msg = latin('hello world')
1288 longmsg = msg * 10
1289 arr = array.array('i', list(range(4)))
1290
1291 if self.TYPE == 'processes':
1292 self.assertEqual(type(conn.fileno()), int)
1293
1294 self.assertEqual(conn.send(seq), None)
1295 self.assertEqual(conn.recv(), seq)
1296
1297 self.assertEqual(conn.send_bytes(msg), None)
1298 self.assertEqual(conn.recv_bytes(), msg)
1299
1300 if self.TYPE == 'processes':
1301 buffer = array.array('i', [0]*10)
1302 expected = list(arr) + [0] * (10 - len(arr))
1303 self.assertEqual(conn.send_bytes(arr), None)
1304 self.assertEqual(conn.recv_bytes_into(buffer),
1305 len(arr) * buffer.itemsize)
1306 self.assertEqual(list(buffer), expected)
1307
1308 buffer = array.array('i', [0]*10)
1309 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1310 self.assertEqual(conn.send_bytes(arr), None)
1311 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1312 len(arr) * buffer.itemsize)
1313 self.assertEqual(list(buffer), expected)
1314
1315 buffer = bytearray(latin(' ' * 40))
1316 self.assertEqual(conn.send_bytes(longmsg), None)
1317 try:
1318 res = conn.recv_bytes_into(buffer)
1319 except multiprocessing.BufferTooShort as e:
1320 self.assertEqual(e.args, (longmsg,))
1321 else:
1322 self.fail('expected BufferTooShort, got %s' % res)
1323
1324 poll = TimingWrapper(conn.poll)
1325
1326 self.assertEqual(poll(), False)
1327 self.assertTimingAlmostEqual(poll.elapsed, 0)
1328
1329 self.assertEqual(poll(TIMEOUT1), False)
1330 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1331
1332 conn.send(None)
1333
1334 self.assertEqual(poll(TIMEOUT1), True)
1335 self.assertTimingAlmostEqual(poll.elapsed, 0)
1336
1337 self.assertEqual(conn.recv(), None)
1338
1339 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1340 conn.send_bytes(really_big_msg)
1341 self.assertEqual(conn.recv_bytes(), really_big_msg)
1342
1343 conn.send_bytes(SENTINEL) # tell child to quit
1344 child_conn.close()
1345
1346 if self.TYPE == 'processes':
1347 self.assertEqual(conn.readable, True)
1348 self.assertEqual(conn.writable, True)
1349 self.assertRaises(EOFError, conn.recv)
1350 self.assertRaises(EOFError, conn.recv_bytes)
1351
1352 p.join()
1353
1354 def test_duplex_false(self):
1355 reader, writer = self.Pipe(duplex=False)
1356 self.assertEqual(writer.send(1), None)
1357 self.assertEqual(reader.recv(), 1)
1358 if self.TYPE == 'processes':
1359 self.assertEqual(reader.readable, True)
1360 self.assertEqual(reader.writable, False)
1361 self.assertEqual(writer.readable, False)
1362 self.assertEqual(writer.writable, True)
1363 self.assertRaises(IOError, reader.send, 2)
1364 self.assertRaises(IOError, writer.recv)
1365 self.assertRaises(IOError, writer.poll)
1366
1367 def test_spawn_close(self):
1368 # We test that a pipe connection can be closed by parent
1369 # process immediately after child is spawned. On Windows this
1370 # would have sometimes failed on old versions because
1371 # child_conn would be closed before the child got a chance to
1372 # duplicate it.
1373 conn, child_conn = self.Pipe()
1374
1375 p = self.Process(target=self._echo, args=(child_conn,))
1376 p.start()
1377 child_conn.close() # this might complete before child initializes
1378
1379 msg = latin('hello')
1380 conn.send_bytes(msg)
1381 self.assertEqual(conn.recv_bytes(), msg)
1382
1383 conn.send_bytes(SENTINEL)
1384 conn.close()
1385 p.join()
1386
1387 def test_sendbytes(self):
1388 if self.TYPE != 'processes':
1389 return
1390
1391 msg = latin('abcdefghijklmnopqrstuvwxyz')
1392 a, b = self.Pipe()
1393
1394 a.send_bytes(msg)
1395 self.assertEqual(b.recv_bytes(), msg)
1396
1397 a.send_bytes(msg, 5)
1398 self.assertEqual(b.recv_bytes(), msg[5:])
1399
1400 a.send_bytes(msg, 7, 8)
1401 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1402
1403 a.send_bytes(msg, 26)
1404 self.assertEqual(b.recv_bytes(), latin(''))
1405
1406 a.send_bytes(msg, 26, 0)
1407 self.assertEqual(b.recv_bytes(), latin(''))
1408
1409 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1410
1411 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1412
1413 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1414
1415 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1416
1417 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1418
Benjamin Petersone711caf2008-06-11 16:44:04 +00001419class _TestListenerClient(BaseTestCase):
1420
1421 ALLOWED_TYPES = ('processes', 'threads')
1422
1423 def _test(self, address):
1424 conn = self.connection.Client(address)
1425 conn.send('hello')
1426 conn.close()
1427
1428 def test_listener_client(self):
1429 for family in self.connection.families:
1430 l = self.connection.Listener(family=family)
1431 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001432 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001433 p.start()
1434 conn = l.accept()
1435 self.assertEqual(conn.recv(), 'hello')
1436 p.join()
1437 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001438#
1439# Test of sending connection and socket objects between processes
1440#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001441"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001442class _TestPicklingConnections(BaseTestCase):
1443
1444 ALLOWED_TYPES = ('processes',)
1445
1446 def _listener(self, conn, families):
1447 for fam in families:
1448 l = self.connection.Listener(family=fam)
1449 conn.send(l.address)
1450 new_conn = l.accept()
1451 conn.send(new_conn)
1452
1453 if self.TYPE == 'processes':
1454 l = socket.socket()
1455 l.bind(('localhost', 0))
1456 conn.send(l.getsockname())
1457 l.listen(1)
1458 new_conn, addr = l.accept()
1459 conn.send(new_conn)
1460
1461 conn.recv()
1462
1463 def _remote(self, conn):
1464 for (address, msg) in iter(conn.recv, None):
1465 client = self.connection.Client(address)
1466 client.send(msg.upper())
1467 client.close()
1468
1469 if self.TYPE == 'processes':
1470 address, msg = conn.recv()
1471 client = socket.socket()
1472 client.connect(address)
1473 client.sendall(msg.upper())
1474 client.close()
1475
1476 conn.close()
1477
1478 def test_pickling(self):
1479 try:
1480 multiprocessing.allow_connection_pickling()
1481 except ImportError:
1482 return
1483
1484 families = self.connection.families
1485
1486 lconn, lconn0 = self.Pipe()
1487 lp = self.Process(target=self._listener, args=(lconn0, families))
1488 lp.start()
1489 lconn0.close()
1490
1491 rconn, rconn0 = self.Pipe()
1492 rp = self.Process(target=self._remote, args=(rconn0,))
1493 rp.start()
1494 rconn0.close()
1495
1496 for fam in families:
1497 msg = ('This connection uses family %s' % fam).encode('ascii')
1498 address = lconn.recv()
1499 rconn.send((address, msg))
1500 new_conn = lconn.recv()
1501 self.assertEqual(new_conn.recv(), msg.upper())
1502
1503 rconn.send(None)
1504
1505 if self.TYPE == 'processes':
1506 msg = latin('This connection uses a normal socket')
1507 address = lconn.recv()
1508 rconn.send((address, msg))
1509 if hasattr(socket, 'fromfd'):
1510 new_conn = lconn.recv()
1511 self.assertEqual(new_conn.recv(100), msg.upper())
1512 else:
1513 # XXX On Windows with Py2.6 need to backport fromfd()
1514 discard = lconn.recv_bytes()
1515
1516 lconn.send(None)
1517
1518 rconn.close()
1519 lconn.close()
1520
1521 lp.join()
1522 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001523"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001524#
1525#
1526#
1527
1528class _TestHeap(BaseTestCase):
1529
1530 ALLOWED_TYPES = ('processes',)
1531
1532 def test_heap(self):
1533 iterations = 5000
1534 maxblocks = 50
1535 blocks = []
1536
1537 # create and destroy lots of blocks of different sizes
1538 for i in range(iterations):
1539 size = int(random.lognormvariate(0, 1) * 1000)
1540 b = multiprocessing.heap.BufferWrapper(size)
1541 blocks.append(b)
1542 if len(blocks) > maxblocks:
1543 i = random.randrange(maxblocks)
1544 del blocks[i]
1545
1546 # get the heap object
1547 heap = multiprocessing.heap.BufferWrapper._heap
1548
1549 # verify the state of the heap
1550 all = []
1551 occupied = 0
1552 for L in list(heap._len_to_seq.values()):
1553 for arena, start, stop in L:
1554 all.append((heap._arenas.index(arena), start, stop,
1555 stop-start, 'free'))
1556 for arena, start, stop in heap._allocated_blocks:
1557 all.append((heap._arenas.index(arena), start, stop,
1558 stop-start, 'occupied'))
1559 occupied += (stop-start)
1560
1561 all.sort()
1562
1563 for i in range(len(all)-1):
1564 (arena, start, stop) = all[i][:3]
1565 (narena, nstart, nstop) = all[i+1][:3]
1566 self.assertTrue((arena != narena and nstart == 0) or
1567 (stop == nstart))
1568
1569#
1570#
1571#
1572
1573try:
1574 from ctypes import Structure, Value, copy, c_int, c_double
1575except ImportError:
1576 Structure = object
1577 c_int = c_double = None
1578
1579class _Foo(Structure):
1580 _fields_ = [
1581 ('x', c_int),
1582 ('y', c_double)
1583 ]
1584
1585class _TestSharedCTypes(BaseTestCase):
1586
1587 ALLOWED_TYPES = ('processes',)
1588
1589 def _double(self, x, y, foo, arr, string):
1590 x.value *= 2
1591 y.value *= 2
1592 foo.x *= 2
1593 foo.y *= 2
1594 string.value *= 2
1595 for i in range(len(arr)):
1596 arr[i] *= 2
1597
1598 def test_sharedctypes(self, lock=False):
1599 if c_int is None:
1600 return
1601
1602 x = Value('i', 7, lock=lock)
1603 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1604 foo = Value(_Foo, 3, 2, lock=lock)
1605 arr = Array('d', list(range(10)), lock=lock)
1606 string = Array('c', 20, lock=lock)
1607 string.value = 'hello'
1608
1609 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1610 p.start()
1611 p.join()
1612
1613 self.assertEqual(x.value, 14)
1614 self.assertAlmostEqual(y.value, 2.0/3.0)
1615 self.assertEqual(foo.x, 6)
1616 self.assertAlmostEqual(foo.y, 4.0)
1617 for i in range(10):
1618 self.assertAlmostEqual(arr[i], i*2)
1619 self.assertEqual(string.value, latin('hellohello'))
1620
1621 def test_synchronize(self):
1622 self.test_sharedctypes(lock=True)
1623
1624 def test_copy(self):
1625 if c_int is None:
1626 return
1627
1628 foo = _Foo(2, 5.0)
1629 bar = copy(foo)
1630 foo.x = 0
1631 foo.y = 0
1632 self.assertEqual(bar.x, 2)
1633 self.assertAlmostEqual(bar.y, 5.0)
1634
1635#
1636#
1637#
1638
1639class _TestFinalize(BaseTestCase):
1640
1641 ALLOWED_TYPES = ('processes',)
1642
1643 def _test_finalize(self, conn):
1644 class Foo(object):
1645 pass
1646
1647 a = Foo()
1648 util.Finalize(a, conn.send, args=('a',))
1649 del a # triggers callback for a
1650
1651 b = Foo()
1652 close_b = util.Finalize(b, conn.send, args=('b',))
1653 close_b() # triggers callback for b
1654 close_b() # does nothing because callback has already been called
1655 del b # does nothing because callback has already been called
1656
1657 c = Foo()
1658 util.Finalize(c, conn.send, args=('c',))
1659
1660 d10 = Foo()
1661 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1662
1663 d01 = Foo()
1664 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1665 d02 = Foo()
1666 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1667 d03 = Foo()
1668 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1669
1670 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1671
1672 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1673
1674 # call mutliprocessing's cleanup function then exit process without
1675 # garbage collecting locals
1676 util._exit_function()
1677 conn.close()
1678 os._exit(0)
1679
1680 def test_finalize(self):
1681 conn, child_conn = self.Pipe()
1682
1683 p = self.Process(target=self._test_finalize, args=(child_conn,))
1684 p.start()
1685 p.join()
1686
1687 result = [obj for obj in iter(conn.recv, 'STOP')]
1688 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1689
1690#
1691# Test that from ... import * works for each module
1692#
1693
1694class _TestImportStar(BaseTestCase):
1695
1696 ALLOWED_TYPES = ('processes',)
1697
1698 def test_import(self):
1699 modules = (
1700 'multiprocessing', 'multiprocessing.connection',
1701 'multiprocessing.heap', 'multiprocessing.managers',
1702 'multiprocessing.pool', 'multiprocessing.process',
1703 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1704 'multiprocessing.synchronize', 'multiprocessing.util'
1705 )
1706
1707 for name in modules:
1708 __import__(name)
1709 mod = sys.modules[name]
1710
1711 for attr in getattr(mod, '__all__', ()):
1712 self.assertTrue(
1713 hasattr(mod, attr),
1714 '%r does not have attribute %r' % (mod, attr)
1715 )
1716
1717#
1718# Quick test that logging works -- does not test logging output
1719#
1720
1721class _TestLogging(BaseTestCase):
1722
1723 ALLOWED_TYPES = ('processes',)
1724
1725 def test_enable_logging(self):
1726 logger = multiprocessing.get_logger()
1727 logger.setLevel(util.SUBWARNING)
1728 self.assertTrue(logger is not None)
1729 logger.debug('this will not be printed')
1730 logger.info('nor will this')
1731 logger.setLevel(LOG_LEVEL)
1732
1733 def _test_level(self, conn):
1734 logger = multiprocessing.get_logger()
1735 conn.send(logger.getEffectiveLevel())
1736
1737 def test_level(self):
1738 LEVEL1 = 32
1739 LEVEL2 = 37
1740
1741 logger = multiprocessing.get_logger()
1742 root_logger = logging.getLogger()
1743 root_level = root_logger.level
1744
1745 reader, writer = multiprocessing.Pipe(duplex=False)
1746
1747 logger.setLevel(LEVEL1)
1748 self.Process(target=self._test_level, args=(writer,)).start()
1749 self.assertEqual(LEVEL1, reader.recv())
1750
1751 logger.setLevel(logging.NOTSET)
1752 root_logger.setLevel(LEVEL2)
1753 self.Process(target=self._test_level, args=(writer,)).start()
1754 self.assertEqual(LEVEL2, reader.recv())
1755
1756 root_logger.setLevel(root_level)
1757 logger.setLevel(level=LOG_LEVEL)
1758
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001759
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001760# class _TestLoggingProcessName(BaseTestCase):
1761#
1762# def handle(self, record):
1763# assert record.processName == multiprocessing.current_process().name
1764# self.__handled = True
1765#
1766# def test_logging(self):
1767# handler = logging.Handler()
1768# handler.handle = self.handle
1769# self.__handled = False
1770# # Bypass getLogger() and side-effects
1771# logger = logging.getLoggerClass()(
1772# 'multiprocessing.test.TestLoggingProcessName')
1773# logger.addHandler(handler)
1774# logger.propagate = False
1775#
1776# logger.warn('foo')
1777# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001778
Benjamin Petersone711caf2008-06-11 16:44:04 +00001779#
Jesse Noller6214edd2009-01-19 16:23:53 +00001780# Test to verify handle verification, see issue 3321
1781#
1782
1783class TestInvalidHandle(unittest.TestCase):
1784
1785 def test_invalid_handles(self):
1786 if WIN32:
1787 return
1788 conn = _multiprocessing.Connection(44977608)
1789 self.assertRaises(IOError, conn.poll)
1790 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1791#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001792# Functions used to create test cases from the base ones in this module
1793#
1794
1795def get_attributes(Source, names):
1796 d = {}
1797 for name in names:
1798 obj = getattr(Source, name)
1799 if type(obj) == type(get_attributes):
1800 obj = staticmethod(obj)
1801 d[name] = obj
1802 return d
1803
1804def create_test_cases(Mixin, type):
1805 result = {}
1806 glob = globals()
1807 Type = type[0].upper() + type[1:]
1808
1809 for name in list(glob.keys()):
1810 if name.startswith('_Test'):
1811 base = glob[name]
1812 if type in base.ALLOWED_TYPES:
1813 newname = 'With' + Type + name[1:]
1814 class Temp(base, unittest.TestCase, Mixin):
1815 pass
1816 result[newname] = Temp
1817 Temp.__name__ = newname
1818 Temp.__module__ = Mixin.__module__
1819 return result
1820
1821#
1822# Create test cases
1823#
1824
1825class ProcessesMixin(object):
1826 TYPE = 'processes'
1827 Process = multiprocessing.Process
1828 locals().update(get_attributes(multiprocessing, (
1829 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1830 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1831 'RawArray', 'current_process', 'active_children', 'Pipe',
1832 'connection', 'JoinableQueue'
1833 )))
1834
1835testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1836globals().update(testcases_processes)
1837
1838
1839class ManagerMixin(object):
1840 TYPE = 'manager'
1841 Process = multiprocessing.Process
1842 manager = object.__new__(multiprocessing.managers.SyncManager)
1843 locals().update(get_attributes(manager, (
1844 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1845 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1846 'Namespace', 'JoinableQueue'
1847 )))
1848
1849testcases_manager = create_test_cases(ManagerMixin, type='manager')
1850globals().update(testcases_manager)
1851
1852
1853class ThreadsMixin(object):
1854 TYPE = 'threads'
1855 Process = multiprocessing.dummy.Process
1856 locals().update(get_attributes(multiprocessing.dummy, (
1857 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1858 'Condition', 'Event', 'Value', 'Array', 'current_process',
1859 'active_children', 'Pipe', 'connection', 'dict', 'list',
1860 'Namespace', 'JoinableQueue'
1861 )))
1862
1863testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1864globals().update(testcases_threads)
1865
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001866class OtherTest(unittest.TestCase):
1867 # TODO: add more tests for deliver/answer challenge.
1868 def test_deliver_challenge_auth_failure(self):
1869 class _FakeConnection(object):
1870 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001871 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001872 def send_bytes(self, data):
1873 pass
1874 self.assertRaises(multiprocessing.AuthenticationError,
1875 multiprocessing.connection.deliver_challenge,
1876 _FakeConnection(), b'abc')
1877
1878 def test_answer_challenge_auth_failure(self):
1879 class _FakeConnection(object):
1880 def __init__(self):
1881 self.count = 0
1882 def recv_bytes(self, size):
1883 self.count += 1
1884 if self.count == 1:
1885 return multiprocessing.connection.CHALLENGE
1886 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001887 return b'something bogus'
1888 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001889 def send_bytes(self, data):
1890 pass
1891 self.assertRaises(multiprocessing.AuthenticationError,
1892 multiprocessing.connection.answer_challenge,
1893 _FakeConnection(), b'abc')
1894
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001895#
1896# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1897#
1898
1899def initializer(ns):
1900 ns.test += 1
1901
1902class TestInitializers(unittest.TestCase):
1903 def setUp(self):
1904 self.mgr = multiprocessing.Manager()
1905 self.ns = self.mgr.Namespace()
1906 self.ns.test = 0
1907
1908 def tearDown(self):
1909 self.mgr.shutdown()
1910
1911 def test_manager_initializer(self):
1912 m = multiprocessing.managers.SyncManager()
1913 self.assertRaises(TypeError, m.start, 1)
1914 m.start(initializer, (self.ns,))
1915 self.assertEqual(self.ns.test, 1)
1916 m.shutdown()
1917
1918 def test_pool_initializer(self):
1919 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1920 p = multiprocessing.Pool(1, initializer, (self.ns,))
1921 p.close()
1922 p.join()
1923 self.assertEqual(self.ns.test, 1)
1924
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00001925#
1926# Issue 5155, 5313, 5331: Test process in processes
1927# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1928#
1929
1930def _ThisSubProcess(q):
1931 try:
1932 item = q.get(block=False)
1933 except pyqueue.Empty:
1934 pass
1935
1936def _TestProcess(q):
1937 queue = multiprocessing.Queue()
1938 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1939 subProc.start()
1940 subProc.join()
1941
1942def _afunc(x):
1943 return x*x
1944
1945def pool_in_process():
1946 pool = multiprocessing.Pool(processes=4)
1947 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1948
1949class _file_like(object):
1950 def __init__(self, delegate):
1951 self._delegate = delegate
1952 self._pid = None
1953
1954 @property
1955 def cache(self):
1956 pid = os.getpid()
1957 # There are no race conditions since fork keeps only the running thread
1958 if pid != self._pid:
1959 self._pid = pid
1960 self._cache = []
1961 return self._cache
1962
1963 def write(self, data):
1964 self.cache.append(data)
1965
1966 def flush(self):
1967 self._delegate.write(''.join(self.cache))
1968 self._cache = []
1969
1970class TestStdinBadfiledescriptor(unittest.TestCase):
1971
1972 def test_queue_in_process(self):
1973 queue = multiprocessing.Queue()
1974 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1975 proc.start()
1976 proc.join()
1977
1978 def test_pool_in_process(self):
1979 p = multiprocessing.Process(target=pool_in_process)
1980 p.start()
1981 p.join()
1982
1983 def test_flushing(self):
1984 sio = io.StringIO()
1985 flike = _file_like(sio)
1986 flike.write('foo')
1987 proc = multiprocessing.Process(target=lambda: flike.flush())
1988 flike.flush()
1989 assert sio.getvalue() == 'foo'
1990
1991testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1992 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001993
Benjamin Petersone711caf2008-06-11 16:44:04 +00001994#
1995#
1996#
1997
1998def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001999 if sys.platform.startswith("linux"):
2000 try:
2001 lock = multiprocessing.RLock()
2002 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002003 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002004
Benjamin Petersone711caf2008-06-11 16:44:04 +00002005 if run is None:
2006 from test.support import run_unittest as run
2007
2008 util.get_temp_dir() # creates temp directory for use by all processes
2009
2010 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2011
Benjamin Peterson41181742008-07-02 20:22:54 +00002012 ProcessesMixin.pool = multiprocessing.Pool(4)
2013 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2014 ManagerMixin.manager.__init__()
2015 ManagerMixin.manager.start()
2016 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002017
2018 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002019 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2020 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002021 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2022 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002023 )
2024
2025 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2026 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2027 run(suite)
2028
Benjamin Peterson41181742008-07-02 20:22:54 +00002029 ThreadsMixin.pool.terminate()
2030 ProcessesMixin.pool.terminate()
2031 ManagerMixin.pool.terminate()
2032 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002033
Benjamin Peterson41181742008-07-02 20:22:54 +00002034 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002035
2036def main():
2037 test_main(unittest.TextTestRunner(verbosity=2).run)
2038
2039if __name__ == '__main__':
2040 main()