blob: 96df5023bc6a7196d92628708519af70ccf3dce9 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import util
39
40#
41#
42#
43
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000044def latin(s):
45 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Benjamin Petersone711caf2008-06-11 16:44:04 +000047#
48# Constants
49#
50
51LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000052#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000053
54DELTA = 0.1
55CHECK_TIMINGS = False # making true makes tests take a lot longer
56 # and can sometimes cause some non-serious
57 # failures because some calls block a bit
58 # longer than expected
59if CHECK_TIMINGS:
60 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
61else:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
63
64HAVE_GETVALUE = not getattr(_multiprocessing,
65 'HAVE_BROKEN_SEM_GETVALUE', False)
66
Jesse Noller6214edd2009-01-19 16:23:53 +000067WIN32 = (sys.platform == "win32")
68
Benjamin Petersone711caf2008-06-11 16:44:04 +000069#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000070# Some tests require ctypes
71#
72
73try:
74 from ctypes import Structure, Value, copy, c_int, c_double
75except ImportError:
76 Structure = object
77 c_int = c_double = None
78
79#
Benjamin Petersone711caf2008-06-11 16:44:04 +000080# Creates a wrapper for a function which records the time it takes to finish
81#
82
83class TimingWrapper(object):
84
85 def __init__(self, func):
86 self.func = func
87 self.elapsed = None
88
89 def __call__(self, *args, **kwds):
90 t = time.time()
91 try:
92 return self.func(*args, **kwds)
93 finally:
94 self.elapsed = time.time() - t
95
96#
97# Base class for test cases
98#
99
100class BaseTestCase(object):
101
102 ALLOWED_TYPES = ('processes', 'manager', 'threads')
103
104 def assertTimingAlmostEqual(self, a, b):
105 if CHECK_TIMINGS:
106 self.assertAlmostEqual(a, b, 1)
107
108 def assertReturnsIfImplemented(self, value, func, *args):
109 try:
110 res = func(*args)
111 except NotImplementedError:
112 pass
113 else:
114 return self.assertEqual(value, res)
115
116#
117# Return the value of a semaphore
118#
119
120def get_value(self):
121 try:
122 return self.get_value()
123 except AttributeError:
124 try:
125 return self._Semaphore__value
126 except AttributeError:
127 try:
128 return self._value
129 except AttributeError:
130 raise NotImplementedError
131
132#
133# Testcases
134#
135
136class _TestProcess(BaseTestCase):
137
138 ALLOWED_TYPES = ('processes', 'threads')
139
140 def test_current(self):
141 if self.TYPE == 'threads':
142 return
143
144 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000145 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146
147 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000148 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000149 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000150 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000151 self.assertEqual(current.ident, os.getpid())
152 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153
154 def _test(self, q, *args, **kwds):
155 current = self.current_process()
156 q.put(args)
157 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 q.put(current.pid)
162
163 def test_process(self):
164 q = self.Queue(1)
165 e = self.Event()
166 args = (q, 1, 2)
167 kwargs = {'hello':23, 'bye':2.54}
168 name = 'SomeProcess'
169 p = self.Process(
170 target=self._test, args=args, kwargs=kwargs, name=name
171 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 current = self.current_process()
174
175 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000176 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000177 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertEquals(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000179 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182
183 p.start()
184
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 self.assertEquals(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000187 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000188
189 self.assertEquals(q.get(), args[1:])
190 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000193 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000194 self.assertEquals(q.get(), p.pid)
195
196 p.join()
197
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 self.assertEquals(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000200 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201
202 def _test_terminate(self):
203 time.sleep(1000)
204
205 def test_terminate(self):
206 if self.TYPE == 'threads':
207 return
208
209 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000210 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000211 p.start()
212
213 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000214 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
217 p.terminate()
218
219 join = TimingWrapper(p.join)
220 self.assertEqual(join(), None)
221 self.assertTimingAlmostEqual(join.elapsed, 0.0)
222
223 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000224 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 p.join()
227
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000228 # XXX sometimes get p.exitcode == 0 on Windows ...
229 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230
231 def test_cpu_count(self):
232 try:
233 cpus = multiprocessing.cpu_count()
234 except NotImplementedError:
235 cpus = 1
236 self.assertTrue(type(cpus) is int)
237 self.assertTrue(cpus >= 1)
238
239 def test_active_children(self):
240 self.assertEqual(type(self.active_children()), list)
241
242 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244
245 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
248 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250
251 def _test_recursion(self, wconn, id):
252 from multiprocessing import forking
253 wconn.send(id)
254 if len(id) < 2:
255 for i in range(2):
256 p = self.Process(
257 target=self._test_recursion, args=(wconn, id+[i])
258 )
259 p.start()
260 p.join()
261
262 def test_recursion(self):
263 rconn, wconn = self.Pipe(duplex=False)
264 self._test_recursion(wconn, [])
265
266 time.sleep(DELTA)
267 result = []
268 while rconn.poll():
269 result.append(rconn.recv())
270
271 expected = [
272 [],
273 [0],
274 [0, 0],
275 [0, 1],
276 [1],
277 [1, 0],
278 [1, 1]
279 ]
280 self.assertEqual(result, expected)
281
282#
283#
284#
285
286class _UpperCaser(multiprocessing.Process):
287
288 def __init__(self):
289 multiprocessing.Process.__init__(self)
290 self.child_conn, self.parent_conn = multiprocessing.Pipe()
291
292 def run(self):
293 self.parent_conn.close()
294 for s in iter(self.child_conn.recv, None):
295 self.child_conn.send(s.upper())
296 self.child_conn.close()
297
298 def submit(self, s):
299 assert type(s) is str
300 self.parent_conn.send(s)
301 return self.parent_conn.recv()
302
303 def stop(self):
304 self.parent_conn.send(None)
305 self.parent_conn.close()
306 self.child_conn.close()
307
308class _TestSubclassingProcess(BaseTestCase):
309
310 ALLOWED_TYPES = ('processes',)
311
312 def test_subclassing(self):
313 uppercaser = _UpperCaser()
314 uppercaser.start()
315 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
316 self.assertEqual(uppercaser.submit('world'), 'WORLD')
317 uppercaser.stop()
318 uppercaser.join()
319
320#
321#
322#
323
324def queue_empty(q):
325 if hasattr(q, 'empty'):
326 return q.empty()
327 else:
328 return q.qsize() == 0
329
330def queue_full(q, maxsize):
331 if hasattr(q, 'full'):
332 return q.full()
333 else:
334 return q.qsize() == maxsize
335
336
337class _TestQueue(BaseTestCase):
338
339
340 def _test_put(self, queue, child_can_start, parent_can_continue):
341 child_can_start.wait()
342 for i in range(6):
343 queue.get()
344 parent_can_continue.set()
345
346 def test_put(self):
347 MAXSIZE = 6
348 queue = self.Queue(maxsize=MAXSIZE)
349 child_can_start = self.Event()
350 parent_can_continue = self.Event()
351
352 proc = self.Process(
353 target=self._test_put,
354 args=(queue, child_can_start, parent_can_continue)
355 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000356 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000357 proc.start()
358
359 self.assertEqual(queue_empty(queue), True)
360 self.assertEqual(queue_full(queue, MAXSIZE), False)
361
362 queue.put(1)
363 queue.put(2, True)
364 queue.put(3, True, None)
365 queue.put(4, False)
366 queue.put(5, False, None)
367 queue.put_nowait(6)
368
369 # the values may be in buffer but not yet in pipe so sleep a bit
370 time.sleep(DELTA)
371
372 self.assertEqual(queue_empty(queue), False)
373 self.assertEqual(queue_full(queue, MAXSIZE), True)
374
375 put = TimingWrapper(queue.put)
376 put_nowait = TimingWrapper(queue.put_nowait)
377
378 self.assertRaises(pyqueue.Full, put, 7, False)
379 self.assertTimingAlmostEqual(put.elapsed, 0)
380
381 self.assertRaises(pyqueue.Full, put, 7, False, None)
382 self.assertTimingAlmostEqual(put.elapsed, 0)
383
384 self.assertRaises(pyqueue.Full, put_nowait, 7)
385 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
386
387 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
388 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
389
390 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
391 self.assertTimingAlmostEqual(put.elapsed, 0)
392
393 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
394 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
395
396 child_can_start.set()
397 parent_can_continue.wait()
398
399 self.assertEqual(queue_empty(queue), True)
400 self.assertEqual(queue_full(queue, MAXSIZE), False)
401
402 proc.join()
403
404 def _test_get(self, queue, child_can_start, parent_can_continue):
405 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000406 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000407 queue.put(2)
408 queue.put(3)
409 queue.put(4)
410 queue.put(5)
411 parent_can_continue.set()
412
413 def test_get(self):
414 queue = self.Queue()
415 child_can_start = self.Event()
416 parent_can_continue = self.Event()
417
418 proc = self.Process(
419 target=self._test_get,
420 args=(queue, child_can_start, parent_can_continue)
421 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000422 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000423 proc.start()
424
425 self.assertEqual(queue_empty(queue), True)
426
427 child_can_start.set()
428 parent_can_continue.wait()
429
430 time.sleep(DELTA)
431 self.assertEqual(queue_empty(queue), False)
432
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000433 # Hangs unexpectedly, remove for now
434 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000435 self.assertEqual(queue.get(True, None), 2)
436 self.assertEqual(queue.get(True), 3)
437 self.assertEqual(queue.get(timeout=1), 4)
438 self.assertEqual(queue.get_nowait(), 5)
439
440 self.assertEqual(queue_empty(queue), True)
441
442 get = TimingWrapper(queue.get)
443 get_nowait = TimingWrapper(queue.get_nowait)
444
445 self.assertRaises(pyqueue.Empty, get, False)
446 self.assertTimingAlmostEqual(get.elapsed, 0)
447
448 self.assertRaises(pyqueue.Empty, get, False, None)
449 self.assertTimingAlmostEqual(get.elapsed, 0)
450
451 self.assertRaises(pyqueue.Empty, get_nowait)
452 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
453
454 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
455 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
456
457 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
458 self.assertTimingAlmostEqual(get.elapsed, 0)
459
460 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
461 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
462
463 proc.join()
464
465 def _test_fork(self, queue):
466 for i in range(10, 20):
467 queue.put(i)
468 # note that at this point the items may only be buffered, so the
469 # process cannot shutdown until the feeder thread has finished
470 # pushing items onto the pipe.
471
472 def test_fork(self):
473 # Old versions of Queue would fail to create a new feeder
474 # thread for a forked process if the original process had its
475 # own feeder thread. This test checks that this no longer
476 # happens.
477
478 queue = self.Queue()
479
480 # put items on queue so that main process starts a feeder thread
481 for i in range(10):
482 queue.put(i)
483
484 # wait to make sure thread starts before we fork a new process
485 time.sleep(DELTA)
486
487 # fork process
488 p = self.Process(target=self._test_fork, args=(queue,))
489 p.start()
490
491 # check that all expected items are in the queue
492 for i in range(20):
493 self.assertEqual(queue.get(), i)
494 self.assertRaises(pyqueue.Empty, queue.get, False)
495
496 p.join()
497
498 def test_qsize(self):
499 q = self.Queue()
500 try:
501 self.assertEqual(q.qsize(), 0)
502 except NotImplementedError:
503 return
504 q.put(1)
505 self.assertEqual(q.qsize(), 1)
506 q.put(5)
507 self.assertEqual(q.qsize(), 2)
508 q.get()
509 self.assertEqual(q.qsize(), 1)
510 q.get()
511 self.assertEqual(q.qsize(), 0)
512
513 def _test_task_done(self, q):
514 for obj in iter(q.get, None):
515 time.sleep(DELTA)
516 q.task_done()
517
518 def test_task_done(self):
519 queue = self.JoinableQueue()
520
521 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000522 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000523
524 workers = [self.Process(target=self._test_task_done, args=(queue,))
525 for i in range(4)]
526
527 for p in workers:
528 p.start()
529
530 for i in range(10):
531 queue.put(i)
532
533 queue.join()
534
535 for p in workers:
536 queue.put(None)
537
538 for p in workers:
539 p.join()
540
541#
542#
543#
544
545class _TestLock(BaseTestCase):
546
547 def test_lock(self):
548 lock = self.Lock()
549 self.assertEqual(lock.acquire(), True)
550 self.assertEqual(lock.acquire(False), False)
551 self.assertEqual(lock.release(), None)
552 self.assertRaises((ValueError, threading.ThreadError), lock.release)
553
554 def test_rlock(self):
555 lock = self.RLock()
556 self.assertEqual(lock.acquire(), True)
557 self.assertEqual(lock.acquire(), True)
558 self.assertEqual(lock.acquire(), True)
559 self.assertEqual(lock.release(), None)
560 self.assertEqual(lock.release(), None)
561 self.assertEqual(lock.release(), None)
562 self.assertRaises((AssertionError, RuntimeError), lock.release)
563
Jesse Nollerf8d00852009-03-31 03:25:07 +0000564 def test_lock_context(self):
565 with self.Lock():
566 pass
567
Benjamin Petersone711caf2008-06-11 16:44:04 +0000568
569class _TestSemaphore(BaseTestCase):
570
571 def _test_semaphore(self, sem):
572 self.assertReturnsIfImplemented(2, get_value, sem)
573 self.assertEqual(sem.acquire(), True)
574 self.assertReturnsIfImplemented(1, get_value, sem)
575 self.assertEqual(sem.acquire(), True)
576 self.assertReturnsIfImplemented(0, get_value, sem)
577 self.assertEqual(sem.acquire(False), False)
578 self.assertReturnsIfImplemented(0, get_value, sem)
579 self.assertEqual(sem.release(), None)
580 self.assertReturnsIfImplemented(1, get_value, sem)
581 self.assertEqual(sem.release(), None)
582 self.assertReturnsIfImplemented(2, get_value, sem)
583
584 def test_semaphore(self):
585 sem = self.Semaphore(2)
586 self._test_semaphore(sem)
587 self.assertEqual(sem.release(), None)
588 self.assertReturnsIfImplemented(3, get_value, sem)
589 self.assertEqual(sem.release(), None)
590 self.assertReturnsIfImplemented(4, get_value, sem)
591
592 def test_bounded_semaphore(self):
593 sem = self.BoundedSemaphore(2)
594 self._test_semaphore(sem)
595 # Currently fails on OS/X
596 #if HAVE_GETVALUE:
597 # self.assertRaises(ValueError, sem.release)
598 # self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_timeout(self):
601 if self.TYPE != 'processes':
602 return
603
604 sem = self.Semaphore(0)
605 acquire = TimingWrapper(sem.acquire)
606
607 self.assertEqual(acquire(False), False)
608 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
609
610 self.assertEqual(acquire(False, None), False)
611 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
612
613 self.assertEqual(acquire(False, TIMEOUT1), False)
614 self.assertTimingAlmostEqual(acquire.elapsed, 0)
615
616 self.assertEqual(acquire(True, TIMEOUT2), False)
617 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
618
619 self.assertEqual(acquire(timeout=TIMEOUT3), False)
620 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
621
622
623class _TestCondition(BaseTestCase):
624
625 def f(self, cond, sleeping, woken, timeout=None):
626 cond.acquire()
627 sleeping.release()
628 cond.wait(timeout)
629 woken.release()
630 cond.release()
631
632 def check_invariant(self, cond):
633 # this is only supposed to succeed when there are no sleepers
634 if self.TYPE == 'processes':
635 try:
636 sleepers = (cond._sleeping_count.get_value() -
637 cond._woken_count.get_value())
638 self.assertEqual(sleepers, 0)
639 self.assertEqual(cond._wait_semaphore.get_value(), 0)
640 except NotImplementedError:
641 pass
642
643 def test_notify(self):
644 cond = self.Condition()
645 sleeping = self.Semaphore(0)
646 woken = self.Semaphore(0)
647
648 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000649 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000650 p.start()
651
652 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000653 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000654 p.start()
655
656 # wait for both children to start sleeping
657 sleeping.acquire()
658 sleeping.acquire()
659
660 # check no process/thread has woken up
661 time.sleep(DELTA)
662 self.assertReturnsIfImplemented(0, get_value, woken)
663
664 # wake up one process/thread
665 cond.acquire()
666 cond.notify()
667 cond.release()
668
669 # check one process/thread has woken up
670 time.sleep(DELTA)
671 self.assertReturnsIfImplemented(1, get_value, woken)
672
673 # wake up another
674 cond.acquire()
675 cond.notify()
676 cond.release()
677
678 # check other has woken up
679 time.sleep(DELTA)
680 self.assertReturnsIfImplemented(2, get_value, woken)
681
682 # check state is not mucked up
683 self.check_invariant(cond)
684 p.join()
685
686 def test_notify_all(self):
687 cond = self.Condition()
688 sleeping = self.Semaphore(0)
689 woken = self.Semaphore(0)
690
691 # start some threads/processes which will timeout
692 for i in range(3):
693 p = self.Process(target=self.f,
694 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000695 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000696 p.start()
697
698 t = threading.Thread(target=self.f,
699 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000700 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701 t.start()
702
703 # wait for them all to sleep
704 for i in range(6):
705 sleeping.acquire()
706
707 # check they have all timed out
708 for i in range(6):
709 woken.acquire()
710 self.assertReturnsIfImplemented(0, get_value, woken)
711
712 # check state is not mucked up
713 self.check_invariant(cond)
714
715 # start some more threads/processes
716 for i in range(3):
717 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000718 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000719 p.start()
720
721 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000722 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000723 t.start()
724
725 # wait for them to all sleep
726 for i in range(6):
727 sleeping.acquire()
728
729 # check no process/thread has woken up
730 time.sleep(DELTA)
731 self.assertReturnsIfImplemented(0, get_value, woken)
732
733 # wake them all up
734 cond.acquire()
735 cond.notify_all()
736 cond.release()
737
738 # check they have all woken
739 time.sleep(DELTA)
740 self.assertReturnsIfImplemented(6, get_value, woken)
741
742 # check state is not mucked up
743 self.check_invariant(cond)
744
745 def test_timeout(self):
746 cond = self.Condition()
747 wait = TimingWrapper(cond.wait)
748 cond.acquire()
749 res = wait(TIMEOUT1)
750 cond.release()
751 self.assertEqual(res, None)
752 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
753
754
755class _TestEvent(BaseTestCase):
756
757 def _test_event(self, event):
758 time.sleep(TIMEOUT2)
759 event.set()
760
761 def test_event(self):
762 event = self.Event()
763 wait = TimingWrapper(event.wait)
764
765 # Removed temporaily, due to API shear, this does not
766 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000767 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000768
Benjamin Peterson965ce872009-04-05 21:24:58 +0000769 # Removed, threading.Event.wait() will return the value of the __flag
770 # instead of None. API Shear with the semaphore backed mp.Event
771 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000772 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000773 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000774 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
775
776 event.set()
777
778 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000779 self.assertEqual(event.is_set(), True)
780 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000781 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000782 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000783 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
784 # self.assertEqual(event.is_set(), True)
785
786 event.clear()
787
788 #self.assertEqual(event.is_set(), False)
789
790 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000791 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792
793#
794#
795#
796
797class _TestValue(BaseTestCase):
798
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000799 ALLOWED_TYPES = ('processes',)
800
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801 codes_values = [
802 ('i', 4343, 24234),
803 ('d', 3.625, -4.25),
804 ('h', -232, 234),
805 ('c', latin('x'), latin('y'))
806 ]
807
808 def _test(self, values):
809 for sv, cv in zip(values, self.codes_values):
810 sv.value = cv[2]
811
812
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000813 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000815 if raw:
816 values = [self.RawValue(code, value)
817 for code, value, _ in self.codes_values]
818 else:
819 values = [self.Value(code, value)
820 for code, value, _ in self.codes_values]
821
822 for sv, cv in zip(values, self.codes_values):
823 self.assertEqual(sv.value, cv[1])
824
825 proc = self.Process(target=self._test, args=(values,))
826 proc.start()
827 proc.join()
828
829 for sv, cv in zip(values, self.codes_values):
830 self.assertEqual(sv.value, cv[2])
831
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000832 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000833 def test_rawvalue(self):
834 self.test_value(raw=True)
835
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000836 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 val1 = self.Value('i', 5)
839 lock1 = val1.get_lock()
840 obj1 = val1.get_obj()
841
842 val2 = self.Value('i', 5, lock=None)
843 lock2 = val2.get_lock()
844 obj2 = val2.get_obj()
845
846 lock = self.Lock()
847 val3 = self.Value('i', 5, lock=lock)
848 lock3 = val3.get_lock()
849 obj3 = val3.get_obj()
850 self.assertEqual(lock, lock3)
851
Jesse Nollerb0516a62009-01-18 03:11:38 +0000852 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 self.assertFalse(hasattr(arr4, 'get_lock'))
854 self.assertFalse(hasattr(arr4, 'get_obj'))
855
Jesse Nollerb0516a62009-01-18 03:11:38 +0000856 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
857
858 arr5 = self.RawValue('i', 5)
859 self.assertFalse(hasattr(arr5, 'get_lock'))
860 self.assertFalse(hasattr(arr5, 'get_obj'))
861
Benjamin Petersone711caf2008-06-11 16:44:04 +0000862
863class _TestArray(BaseTestCase):
864
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000865 ALLOWED_TYPES = ('processes',)
866
Benjamin Petersone711caf2008-06-11 16:44:04 +0000867 def f(self, seq):
868 for i in range(1, len(seq)):
869 seq[i] += seq[i-1]
870
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000871 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000872 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
874 if raw:
875 arr = self.RawArray('i', seq)
876 else:
877 arr = self.Array('i', seq)
878
879 self.assertEqual(len(arr), len(seq))
880 self.assertEqual(arr[3], seq[3])
881 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
882
883 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
884
885 self.assertEqual(list(arr[:]), seq)
886
887 self.f(seq)
888
889 p = self.Process(target=self.f, args=(arr,))
890 p.start()
891 p.join()
892
893 self.assertEqual(list(arr[:]), seq)
894
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000895 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 def test_rawarray(self):
897 self.test_array(raw=True)
898
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000899 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000901 arr1 = self.Array('i', list(range(10)))
902 lock1 = arr1.get_lock()
903 obj1 = arr1.get_obj()
904
905 arr2 = self.Array('i', list(range(10)), lock=None)
906 lock2 = arr2.get_lock()
907 obj2 = arr2.get_obj()
908
909 lock = self.Lock()
910 arr3 = self.Array('i', list(range(10)), lock=lock)
911 lock3 = arr3.get_lock()
912 obj3 = arr3.get_obj()
913 self.assertEqual(lock, lock3)
914
Jesse Nollerb0516a62009-01-18 03:11:38 +0000915 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000916 self.assertFalse(hasattr(arr4, 'get_lock'))
917 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000918 self.assertRaises(AttributeError,
919 self.Array, 'i', range(10), lock='notalock')
920
921 arr5 = self.RawArray('i', range(10))
922 self.assertFalse(hasattr(arr5, 'get_lock'))
923 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000924
925#
926#
927#
928
929class _TestContainers(BaseTestCase):
930
931 ALLOWED_TYPES = ('manager',)
932
933 def test_list(self):
934 a = self.list(list(range(10)))
935 self.assertEqual(a[:], list(range(10)))
936
937 b = self.list()
938 self.assertEqual(b[:], [])
939
940 b.extend(list(range(5)))
941 self.assertEqual(b[:], list(range(5)))
942
943 self.assertEqual(b[2], 2)
944 self.assertEqual(b[2:10], [2,3,4])
945
946 b *= 2
947 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
948
949 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
950
951 self.assertEqual(a[:], list(range(10)))
952
953 d = [a, b]
954 e = self.list(d)
955 self.assertEqual(
956 e[:],
957 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
958 )
959
960 f = self.list([a])
961 a.append('hello')
962 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
963
964 def test_dict(self):
965 d = self.dict()
966 indices = list(range(65, 70))
967 for i in indices:
968 d[i] = chr(i)
969 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
970 self.assertEqual(sorted(d.keys()), indices)
971 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
972 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
973
974 def test_namespace(self):
975 n = self.Namespace()
976 n.name = 'Bob'
977 n.job = 'Builder'
978 n._hidden = 'hidden'
979 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
980 del n.job
981 self.assertEqual(str(n), "Namespace(name='Bob')")
982 self.assertTrue(hasattr(n, 'name'))
983 self.assertTrue(not hasattr(n, 'job'))
984
985#
986#
987#
988
989def sqr(x, wait=0.0):
990 time.sleep(wait)
991 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000992class _TestPool(BaseTestCase):
993
994 def test_apply(self):
995 papply = self.pool.apply
996 self.assertEqual(papply(sqr, (5,)), sqr(5))
997 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
998
999 def test_map(self):
1000 pmap = self.pool.map
1001 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1002 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1003 list(map(sqr, list(range(100)))))
1004
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001005 def test_map_chunksize(self):
1006 try:
1007 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1008 except multiprocessing.TimeoutError:
1009 self.fail("pool.map_async with chunksize stalled on null list")
1010
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011 def test_async(self):
1012 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1013 get = TimingWrapper(res.get)
1014 self.assertEqual(get(), 49)
1015 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1016
1017 def test_async_timeout(self):
1018 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1019 get = TimingWrapper(res.get)
1020 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1021 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1022
1023 def test_imap(self):
1024 it = self.pool.imap(sqr, list(range(10)))
1025 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1026
1027 it = self.pool.imap(sqr, list(range(10)))
1028 for i in range(10):
1029 self.assertEqual(next(it), i*i)
1030 self.assertRaises(StopIteration, it.__next__)
1031
1032 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1033 for i in range(1000):
1034 self.assertEqual(next(it), i*i)
1035 self.assertRaises(StopIteration, it.__next__)
1036
1037 def test_imap_unordered(self):
1038 it = self.pool.imap_unordered(sqr, list(range(1000)))
1039 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1040
1041 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1042 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1043
1044 def test_make_pool(self):
1045 p = multiprocessing.Pool(3)
1046 self.assertEqual(3, len(p._pool))
1047 p.close()
1048 p.join()
1049
1050 def test_terminate(self):
1051 if self.TYPE == 'manager':
1052 # On Unix a forked process increfs each shared object to
1053 # which its parent process held a reference. If the
1054 # forked process gets terminated then there is likely to
1055 # be a reference leak. So to prevent
1056 # _TestZZZNumberOfObjects from failing we skip this test
1057 # when using a manager.
1058 return
1059
1060 result = self.pool.map_async(
1061 time.sleep, [0.1 for i in range(10000)], chunksize=1
1062 )
1063 self.pool.terminate()
1064 join = TimingWrapper(self.pool.join)
1065 join()
1066 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001067
1068class _TestPoolWorkerLifetime(BaseTestCase):
1069
1070 ALLOWED_TYPES = ('processes', )
1071 def test_pool_worker_lifetime(self):
1072 p = multiprocessing.Pool(3, maxtasksperchild=10)
1073 self.assertEqual(3, len(p._pool))
1074 origworkerpids = [w.pid for w in p._pool]
1075 # Run many tasks so each worker gets replaced (hopefully)
1076 results = []
1077 for i in range(100):
1078 results.append(p.apply_async(sqr, (i, )))
1079 # Fetch the results and verify we got the right answers,
1080 # also ensuring all the tasks have completed.
1081 for (j, res) in enumerate(results):
1082 self.assertEqual(res.get(), sqr(j))
1083 # Refill the pool
1084 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001085 # Wait until all workers are alive
1086 countdown = 5
1087 while countdown and not all(w.is_alive() for w in p._pool):
1088 countdown -= 1
1089 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001090 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001091 # All pids should be assigned. See issue #7805.
1092 self.assertNotIn(None, origworkerpids)
1093 self.assertNotIn(None, finalworkerpids)
1094 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001095 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1096 p.close()
1097 p.join()
1098
Benjamin Petersone711caf2008-06-11 16:44:04 +00001099#
1100# Test that manager has expected number of shared objects left
1101#
1102
1103class _TestZZZNumberOfObjects(BaseTestCase):
1104 # Because test cases are sorted alphabetically, this one will get
1105 # run after all the other tests for the manager. It tests that
1106 # there have been no "reference leaks" for the manager's shared
1107 # objects. Note the comment in _TestPool.test_terminate().
1108 ALLOWED_TYPES = ('manager',)
1109
1110 def test_number_of_objects(self):
1111 EXPECTED_NUMBER = 1 # the pool object is still alive
1112 multiprocessing.active_children() # discard dead process objs
1113 gc.collect() # do garbage collection
1114 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001115 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001116 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001117 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001118 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001119
1120 self.assertEqual(refs, EXPECTED_NUMBER)
1121
1122#
1123# Test of creating a customized manager class
1124#
1125
1126from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1127
1128class FooBar(object):
1129 def f(self):
1130 return 'f()'
1131 def g(self):
1132 raise ValueError
1133 def _h(self):
1134 return '_h()'
1135
1136def baz():
1137 for i in range(10):
1138 yield i*i
1139
1140class IteratorProxy(BaseProxy):
1141 _exposed_ = ('next', '__next__')
1142 def __iter__(self):
1143 return self
1144 def __next__(self):
1145 return self._callmethod('next')
1146 def __next__(self):
1147 return self._callmethod('__next__')
1148
1149class MyManager(BaseManager):
1150 pass
1151
1152MyManager.register('Foo', callable=FooBar)
1153MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1154MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1155
1156
1157class _TestMyManager(BaseTestCase):
1158
1159 ALLOWED_TYPES = ('manager',)
1160
1161 def test_mymanager(self):
1162 manager = MyManager()
1163 manager.start()
1164
1165 foo = manager.Foo()
1166 bar = manager.Bar()
1167 baz = manager.baz()
1168
1169 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1170 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1171
1172 self.assertEqual(foo_methods, ['f', 'g'])
1173 self.assertEqual(bar_methods, ['f', '_h'])
1174
1175 self.assertEqual(foo.f(), 'f()')
1176 self.assertRaises(ValueError, foo.g)
1177 self.assertEqual(foo._callmethod('f'), 'f()')
1178 self.assertRaises(RemoteError, foo._callmethod, '_h')
1179
1180 self.assertEqual(bar.f(), 'f()')
1181 self.assertEqual(bar._h(), '_h()')
1182 self.assertEqual(bar._callmethod('f'), 'f()')
1183 self.assertEqual(bar._callmethod('_h'), '_h()')
1184
1185 self.assertEqual(list(baz), [i*i for i in range(10)])
1186
1187 manager.shutdown()
1188
1189#
1190# Test of connecting to a remote server and using xmlrpclib for serialization
1191#
1192
1193_queue = pyqueue.Queue()
1194def get_queue():
1195 return _queue
1196
1197class QueueManager(BaseManager):
1198 '''manager class used by server process'''
1199QueueManager.register('get_queue', callable=get_queue)
1200
1201class QueueManager2(BaseManager):
1202 '''manager class which specifies the same interface as QueueManager'''
1203QueueManager2.register('get_queue')
1204
1205
1206SERIALIZER = 'xmlrpclib'
1207
1208class _TestRemoteManager(BaseTestCase):
1209
1210 ALLOWED_TYPES = ('manager',)
1211
1212 def _putter(self, address, authkey):
1213 manager = QueueManager2(
1214 address=address, authkey=authkey, serializer=SERIALIZER
1215 )
1216 manager.connect()
1217 queue = manager.get_queue()
1218 queue.put(('hello world', None, True, 2.25))
1219
1220 def test_remote(self):
1221 authkey = os.urandom(32)
1222
1223 manager = QueueManager(
1224 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1225 )
1226 manager.start()
1227
1228 p = self.Process(target=self._putter, args=(manager.address, authkey))
1229 p.start()
1230
1231 manager2 = QueueManager2(
1232 address=manager.address, authkey=authkey, serializer=SERIALIZER
1233 )
1234 manager2.connect()
1235 queue = manager2.get_queue()
1236
1237 # Note that xmlrpclib will deserialize object as a list not a tuple
1238 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1239
1240 # Because we are using xmlrpclib for serialization instead of
1241 # pickle this will cause a serialization error.
1242 self.assertRaises(Exception, queue.put, time.sleep)
1243
1244 # Make queue finalizer run before the server is stopped
1245 del queue
1246 manager.shutdown()
1247
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001248class _TestManagerRestart(BaseTestCase):
1249
1250 def _putter(self, address, authkey):
1251 manager = QueueManager(
1252 address=address, authkey=authkey, serializer=SERIALIZER)
1253 manager.connect()
1254 queue = manager.get_queue()
1255 queue.put('hello world')
1256
1257 def test_rapid_restart(self):
1258 authkey = os.urandom(32)
1259 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001260 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1261 addr = manager.get_server().address
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001262 manager.start()
1263
1264 p = self.Process(target=self._putter, args=(manager.address, authkey))
1265 p.start()
1266 queue = manager.get_queue()
1267 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001268 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001269 manager.shutdown()
1270 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001271 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001272 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001273 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001274
Benjamin Petersone711caf2008-06-11 16:44:04 +00001275#
1276#
1277#
1278
1279SENTINEL = latin('')
1280
1281class _TestConnection(BaseTestCase):
1282
1283 ALLOWED_TYPES = ('processes', 'threads')
1284
1285 def _echo(self, conn):
1286 for msg in iter(conn.recv_bytes, SENTINEL):
1287 conn.send_bytes(msg)
1288 conn.close()
1289
1290 def test_connection(self):
1291 conn, child_conn = self.Pipe()
1292
1293 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001294 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001295 p.start()
1296
1297 seq = [1, 2.25, None]
1298 msg = latin('hello world')
1299 longmsg = msg * 10
1300 arr = array.array('i', list(range(4)))
1301
1302 if self.TYPE == 'processes':
1303 self.assertEqual(type(conn.fileno()), int)
1304
1305 self.assertEqual(conn.send(seq), None)
1306 self.assertEqual(conn.recv(), seq)
1307
1308 self.assertEqual(conn.send_bytes(msg), None)
1309 self.assertEqual(conn.recv_bytes(), msg)
1310
1311 if self.TYPE == 'processes':
1312 buffer = array.array('i', [0]*10)
1313 expected = list(arr) + [0] * (10 - len(arr))
1314 self.assertEqual(conn.send_bytes(arr), None)
1315 self.assertEqual(conn.recv_bytes_into(buffer),
1316 len(arr) * buffer.itemsize)
1317 self.assertEqual(list(buffer), expected)
1318
1319 buffer = array.array('i', [0]*10)
1320 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1321 self.assertEqual(conn.send_bytes(arr), None)
1322 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1323 len(arr) * buffer.itemsize)
1324 self.assertEqual(list(buffer), expected)
1325
1326 buffer = bytearray(latin(' ' * 40))
1327 self.assertEqual(conn.send_bytes(longmsg), None)
1328 try:
1329 res = conn.recv_bytes_into(buffer)
1330 except multiprocessing.BufferTooShort as e:
1331 self.assertEqual(e.args, (longmsg,))
1332 else:
1333 self.fail('expected BufferTooShort, got %s' % res)
1334
1335 poll = TimingWrapper(conn.poll)
1336
1337 self.assertEqual(poll(), False)
1338 self.assertTimingAlmostEqual(poll.elapsed, 0)
1339
1340 self.assertEqual(poll(TIMEOUT1), False)
1341 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1342
1343 conn.send(None)
1344
1345 self.assertEqual(poll(TIMEOUT1), True)
1346 self.assertTimingAlmostEqual(poll.elapsed, 0)
1347
1348 self.assertEqual(conn.recv(), None)
1349
1350 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1351 conn.send_bytes(really_big_msg)
1352 self.assertEqual(conn.recv_bytes(), really_big_msg)
1353
1354 conn.send_bytes(SENTINEL) # tell child to quit
1355 child_conn.close()
1356
1357 if self.TYPE == 'processes':
1358 self.assertEqual(conn.readable, True)
1359 self.assertEqual(conn.writable, True)
1360 self.assertRaises(EOFError, conn.recv)
1361 self.assertRaises(EOFError, conn.recv_bytes)
1362
1363 p.join()
1364
1365 def test_duplex_false(self):
1366 reader, writer = self.Pipe(duplex=False)
1367 self.assertEqual(writer.send(1), None)
1368 self.assertEqual(reader.recv(), 1)
1369 if self.TYPE == 'processes':
1370 self.assertEqual(reader.readable, True)
1371 self.assertEqual(reader.writable, False)
1372 self.assertEqual(writer.readable, False)
1373 self.assertEqual(writer.writable, True)
1374 self.assertRaises(IOError, reader.send, 2)
1375 self.assertRaises(IOError, writer.recv)
1376 self.assertRaises(IOError, writer.poll)
1377
1378 def test_spawn_close(self):
1379 # We test that a pipe connection can be closed by parent
1380 # process immediately after child is spawned. On Windows this
1381 # would have sometimes failed on old versions because
1382 # child_conn would be closed before the child got a chance to
1383 # duplicate it.
1384 conn, child_conn = self.Pipe()
1385
1386 p = self.Process(target=self._echo, args=(child_conn,))
1387 p.start()
1388 child_conn.close() # this might complete before child initializes
1389
1390 msg = latin('hello')
1391 conn.send_bytes(msg)
1392 self.assertEqual(conn.recv_bytes(), msg)
1393
1394 conn.send_bytes(SENTINEL)
1395 conn.close()
1396 p.join()
1397
1398 def test_sendbytes(self):
1399 if self.TYPE != 'processes':
1400 return
1401
1402 msg = latin('abcdefghijklmnopqrstuvwxyz')
1403 a, b = self.Pipe()
1404
1405 a.send_bytes(msg)
1406 self.assertEqual(b.recv_bytes(), msg)
1407
1408 a.send_bytes(msg, 5)
1409 self.assertEqual(b.recv_bytes(), msg[5:])
1410
1411 a.send_bytes(msg, 7, 8)
1412 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1413
1414 a.send_bytes(msg, 26)
1415 self.assertEqual(b.recv_bytes(), latin(''))
1416
1417 a.send_bytes(msg, 26, 0)
1418 self.assertEqual(b.recv_bytes(), latin(''))
1419
1420 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1421
1422 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1423
1424 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1425
1426 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1427
1428 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1429
Benjamin Petersone711caf2008-06-11 16:44:04 +00001430class _TestListenerClient(BaseTestCase):
1431
1432 ALLOWED_TYPES = ('processes', 'threads')
1433
1434 def _test(self, address):
1435 conn = self.connection.Client(address)
1436 conn.send('hello')
1437 conn.close()
1438
1439 def test_listener_client(self):
1440 for family in self.connection.families:
1441 l = self.connection.Listener(family=family)
1442 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001443 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001444 p.start()
1445 conn = l.accept()
1446 self.assertEqual(conn.recv(), 'hello')
1447 p.join()
1448 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001449#
1450# Test of sending connection and socket objects between processes
1451#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001452"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453class _TestPicklingConnections(BaseTestCase):
1454
1455 ALLOWED_TYPES = ('processes',)
1456
1457 def _listener(self, conn, families):
1458 for fam in families:
1459 l = self.connection.Listener(family=fam)
1460 conn.send(l.address)
1461 new_conn = l.accept()
1462 conn.send(new_conn)
1463
1464 if self.TYPE == 'processes':
1465 l = socket.socket()
1466 l.bind(('localhost', 0))
1467 conn.send(l.getsockname())
1468 l.listen(1)
1469 new_conn, addr = l.accept()
1470 conn.send(new_conn)
1471
1472 conn.recv()
1473
1474 def _remote(self, conn):
1475 for (address, msg) in iter(conn.recv, None):
1476 client = self.connection.Client(address)
1477 client.send(msg.upper())
1478 client.close()
1479
1480 if self.TYPE == 'processes':
1481 address, msg = conn.recv()
1482 client = socket.socket()
1483 client.connect(address)
1484 client.sendall(msg.upper())
1485 client.close()
1486
1487 conn.close()
1488
1489 def test_pickling(self):
1490 try:
1491 multiprocessing.allow_connection_pickling()
1492 except ImportError:
1493 return
1494
1495 families = self.connection.families
1496
1497 lconn, lconn0 = self.Pipe()
1498 lp = self.Process(target=self._listener, args=(lconn0, families))
1499 lp.start()
1500 lconn0.close()
1501
1502 rconn, rconn0 = self.Pipe()
1503 rp = self.Process(target=self._remote, args=(rconn0,))
1504 rp.start()
1505 rconn0.close()
1506
1507 for fam in families:
1508 msg = ('This connection uses family %s' % fam).encode('ascii')
1509 address = lconn.recv()
1510 rconn.send((address, msg))
1511 new_conn = lconn.recv()
1512 self.assertEqual(new_conn.recv(), msg.upper())
1513
1514 rconn.send(None)
1515
1516 if self.TYPE == 'processes':
1517 msg = latin('This connection uses a normal socket')
1518 address = lconn.recv()
1519 rconn.send((address, msg))
1520 if hasattr(socket, 'fromfd'):
1521 new_conn = lconn.recv()
1522 self.assertEqual(new_conn.recv(100), msg.upper())
1523 else:
1524 # XXX On Windows with Py2.6 need to backport fromfd()
1525 discard = lconn.recv_bytes()
1526
1527 lconn.send(None)
1528
1529 rconn.close()
1530 lconn.close()
1531
1532 lp.join()
1533 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001534"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001535#
1536#
1537#
1538
1539class _TestHeap(BaseTestCase):
1540
1541 ALLOWED_TYPES = ('processes',)
1542
1543 def test_heap(self):
1544 iterations = 5000
1545 maxblocks = 50
1546 blocks = []
1547
1548 # create and destroy lots of blocks of different sizes
1549 for i in range(iterations):
1550 size = int(random.lognormvariate(0, 1) * 1000)
1551 b = multiprocessing.heap.BufferWrapper(size)
1552 blocks.append(b)
1553 if len(blocks) > maxblocks:
1554 i = random.randrange(maxblocks)
1555 del blocks[i]
1556
1557 # get the heap object
1558 heap = multiprocessing.heap.BufferWrapper._heap
1559
1560 # verify the state of the heap
1561 all = []
1562 occupied = 0
1563 for L in list(heap._len_to_seq.values()):
1564 for arena, start, stop in L:
1565 all.append((heap._arenas.index(arena), start, stop,
1566 stop-start, 'free'))
1567 for arena, start, stop in heap._allocated_blocks:
1568 all.append((heap._arenas.index(arena), start, stop,
1569 stop-start, 'occupied'))
1570 occupied += (stop-start)
1571
1572 all.sort()
1573
1574 for i in range(len(all)-1):
1575 (arena, start, stop) = all[i][:3]
1576 (narena, nstart, nstop) = all[i+1][:3]
1577 self.assertTrue((arena != narena and nstart == 0) or
1578 (stop == nstart))
1579
1580#
1581#
1582#
1583
Benjamin Petersone711caf2008-06-11 16:44:04 +00001584class _Foo(Structure):
1585 _fields_ = [
1586 ('x', c_int),
1587 ('y', c_double)
1588 ]
1589
1590class _TestSharedCTypes(BaseTestCase):
1591
1592 ALLOWED_TYPES = ('processes',)
1593
1594 def _double(self, x, y, foo, arr, string):
1595 x.value *= 2
1596 y.value *= 2
1597 foo.x *= 2
1598 foo.y *= 2
1599 string.value *= 2
1600 for i in range(len(arr)):
1601 arr[i] *= 2
1602
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001603 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001604 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001605 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001606 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001607 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001608 arr = self.Array('d', list(range(10)), lock=lock)
1609 string = self.Array('c', 20, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001610 string.value = 'hello'
1611
1612 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1613 p.start()
1614 p.join()
1615
1616 self.assertEqual(x.value, 14)
1617 self.assertAlmostEqual(y.value, 2.0/3.0)
1618 self.assertEqual(foo.x, 6)
1619 self.assertAlmostEqual(foo.y, 4.0)
1620 for i in range(10):
1621 self.assertAlmostEqual(arr[i], i*2)
1622 self.assertEqual(string.value, latin('hellohello'))
1623
1624 def test_synchronize(self):
1625 self.test_sharedctypes(lock=True)
1626
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001627 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001628 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001629 foo = _Foo(2, 5.0)
1630 bar = copy(foo)
1631 foo.x = 0
1632 foo.y = 0
1633 self.assertEqual(bar.x, 2)
1634 self.assertAlmostEqual(bar.y, 5.0)
1635
1636#
1637#
1638#
1639
1640class _TestFinalize(BaseTestCase):
1641
1642 ALLOWED_TYPES = ('processes',)
1643
1644 def _test_finalize(self, conn):
1645 class Foo(object):
1646 pass
1647
1648 a = Foo()
1649 util.Finalize(a, conn.send, args=('a',))
1650 del a # triggers callback for a
1651
1652 b = Foo()
1653 close_b = util.Finalize(b, conn.send, args=('b',))
1654 close_b() # triggers callback for b
1655 close_b() # does nothing because callback has already been called
1656 del b # does nothing because callback has already been called
1657
1658 c = Foo()
1659 util.Finalize(c, conn.send, args=('c',))
1660
1661 d10 = Foo()
1662 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1663
1664 d01 = Foo()
1665 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1666 d02 = Foo()
1667 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1668 d03 = Foo()
1669 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1670
1671 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1672
1673 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1674
1675 # call mutliprocessing's cleanup function then exit process without
1676 # garbage collecting locals
1677 util._exit_function()
1678 conn.close()
1679 os._exit(0)
1680
1681 def test_finalize(self):
1682 conn, child_conn = self.Pipe()
1683
1684 p = self.Process(target=self._test_finalize, args=(child_conn,))
1685 p.start()
1686 p.join()
1687
1688 result = [obj for obj in iter(conn.recv, 'STOP')]
1689 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1690
1691#
1692# Test that from ... import * works for each module
1693#
1694
1695class _TestImportStar(BaseTestCase):
1696
1697 ALLOWED_TYPES = ('processes',)
1698
1699 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001700 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001701 'multiprocessing', 'multiprocessing.connection',
1702 'multiprocessing.heap', 'multiprocessing.managers',
1703 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001704 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001705 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001706 ]
1707
1708 if c_int is not None:
1709 # This module requires _ctypes
1710 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001711
1712 for name in modules:
1713 __import__(name)
1714 mod = sys.modules[name]
1715
1716 for attr in getattr(mod, '__all__', ()):
1717 self.assertTrue(
1718 hasattr(mod, attr),
1719 '%r does not have attribute %r' % (mod, attr)
1720 )
1721
1722#
1723# Quick test that logging works -- does not test logging output
1724#
1725
1726class _TestLogging(BaseTestCase):
1727
1728 ALLOWED_TYPES = ('processes',)
1729
1730 def test_enable_logging(self):
1731 logger = multiprocessing.get_logger()
1732 logger.setLevel(util.SUBWARNING)
1733 self.assertTrue(logger is not None)
1734 logger.debug('this will not be printed')
1735 logger.info('nor will this')
1736 logger.setLevel(LOG_LEVEL)
1737
1738 def _test_level(self, conn):
1739 logger = multiprocessing.get_logger()
1740 conn.send(logger.getEffectiveLevel())
1741
1742 def test_level(self):
1743 LEVEL1 = 32
1744 LEVEL2 = 37
1745
1746 logger = multiprocessing.get_logger()
1747 root_logger = logging.getLogger()
1748 root_level = root_logger.level
1749
1750 reader, writer = multiprocessing.Pipe(duplex=False)
1751
1752 logger.setLevel(LEVEL1)
1753 self.Process(target=self._test_level, args=(writer,)).start()
1754 self.assertEqual(LEVEL1, reader.recv())
1755
1756 logger.setLevel(logging.NOTSET)
1757 root_logger.setLevel(LEVEL2)
1758 self.Process(target=self._test_level, args=(writer,)).start()
1759 self.assertEqual(LEVEL2, reader.recv())
1760
1761 root_logger.setLevel(root_level)
1762 logger.setLevel(level=LOG_LEVEL)
1763
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001764
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001765# class _TestLoggingProcessName(BaseTestCase):
1766#
1767# def handle(self, record):
1768# assert record.processName == multiprocessing.current_process().name
1769# self.__handled = True
1770#
1771# def test_logging(self):
1772# handler = logging.Handler()
1773# handler.handle = self.handle
1774# self.__handled = False
1775# # Bypass getLogger() and side-effects
1776# logger = logging.getLoggerClass()(
1777# 'multiprocessing.test.TestLoggingProcessName')
1778# logger.addHandler(handler)
1779# logger.propagate = False
1780#
1781# logger.warn('foo')
1782# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001783
Benjamin Petersone711caf2008-06-11 16:44:04 +00001784#
Jesse Noller6214edd2009-01-19 16:23:53 +00001785# Test to verify handle verification, see issue 3321
1786#
1787
1788class TestInvalidHandle(unittest.TestCase):
1789
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001790 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001791 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001792 conn = _multiprocessing.Connection(44977608)
1793 self.assertRaises(IOError, conn.poll)
1794 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001795
Jesse Noller6214edd2009-01-19 16:23:53 +00001796#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001797# Functions used to create test cases from the base ones in this module
1798#
1799
1800def get_attributes(Source, names):
1801 d = {}
1802 for name in names:
1803 obj = getattr(Source, name)
1804 if type(obj) == type(get_attributes):
1805 obj = staticmethod(obj)
1806 d[name] = obj
1807 return d
1808
1809def create_test_cases(Mixin, type):
1810 result = {}
1811 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001812 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001813
1814 for name in list(glob.keys()):
1815 if name.startswith('_Test'):
1816 base = glob[name]
1817 if type in base.ALLOWED_TYPES:
1818 newname = 'With' + Type + name[1:]
1819 class Temp(base, unittest.TestCase, Mixin):
1820 pass
1821 result[newname] = Temp
1822 Temp.__name__ = newname
1823 Temp.__module__ = Mixin.__module__
1824 return result
1825
1826#
1827# Create test cases
1828#
1829
1830class ProcessesMixin(object):
1831 TYPE = 'processes'
1832 Process = multiprocessing.Process
1833 locals().update(get_attributes(multiprocessing, (
1834 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1835 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1836 'RawArray', 'current_process', 'active_children', 'Pipe',
1837 'connection', 'JoinableQueue'
1838 )))
1839
1840testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1841globals().update(testcases_processes)
1842
1843
1844class ManagerMixin(object):
1845 TYPE = 'manager'
1846 Process = multiprocessing.Process
1847 manager = object.__new__(multiprocessing.managers.SyncManager)
1848 locals().update(get_attributes(manager, (
1849 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1850 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1851 'Namespace', 'JoinableQueue'
1852 )))
1853
1854testcases_manager = create_test_cases(ManagerMixin, type='manager')
1855globals().update(testcases_manager)
1856
1857
1858class ThreadsMixin(object):
1859 TYPE = 'threads'
1860 Process = multiprocessing.dummy.Process
1861 locals().update(get_attributes(multiprocessing.dummy, (
1862 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1863 'Condition', 'Event', 'Value', 'Array', 'current_process',
1864 'active_children', 'Pipe', 'connection', 'dict', 'list',
1865 'Namespace', 'JoinableQueue'
1866 )))
1867
1868testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1869globals().update(testcases_threads)
1870
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001871class OtherTest(unittest.TestCase):
1872 # TODO: add more tests for deliver/answer challenge.
1873 def test_deliver_challenge_auth_failure(self):
1874 class _FakeConnection(object):
1875 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001876 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001877 def send_bytes(self, data):
1878 pass
1879 self.assertRaises(multiprocessing.AuthenticationError,
1880 multiprocessing.connection.deliver_challenge,
1881 _FakeConnection(), b'abc')
1882
1883 def test_answer_challenge_auth_failure(self):
1884 class _FakeConnection(object):
1885 def __init__(self):
1886 self.count = 0
1887 def recv_bytes(self, size):
1888 self.count += 1
1889 if self.count == 1:
1890 return multiprocessing.connection.CHALLENGE
1891 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001892 return b'something bogus'
1893 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001894 def send_bytes(self, data):
1895 pass
1896 self.assertRaises(multiprocessing.AuthenticationError,
1897 multiprocessing.connection.answer_challenge,
1898 _FakeConnection(), b'abc')
1899
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001900#
1901# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1902#
1903
1904def initializer(ns):
1905 ns.test += 1
1906
1907class TestInitializers(unittest.TestCase):
1908 def setUp(self):
1909 self.mgr = multiprocessing.Manager()
1910 self.ns = self.mgr.Namespace()
1911 self.ns.test = 0
1912
1913 def tearDown(self):
1914 self.mgr.shutdown()
1915
1916 def test_manager_initializer(self):
1917 m = multiprocessing.managers.SyncManager()
1918 self.assertRaises(TypeError, m.start, 1)
1919 m.start(initializer, (self.ns,))
1920 self.assertEqual(self.ns.test, 1)
1921 m.shutdown()
1922
1923 def test_pool_initializer(self):
1924 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1925 p = multiprocessing.Pool(1, initializer, (self.ns,))
1926 p.close()
1927 p.join()
1928 self.assertEqual(self.ns.test, 1)
1929
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00001930#
1931# Issue 5155, 5313, 5331: Test process in processes
1932# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1933#
1934
1935def _ThisSubProcess(q):
1936 try:
1937 item = q.get(block=False)
1938 except pyqueue.Empty:
1939 pass
1940
1941def _TestProcess(q):
1942 queue = multiprocessing.Queue()
1943 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1944 subProc.start()
1945 subProc.join()
1946
1947def _afunc(x):
1948 return x*x
1949
1950def pool_in_process():
1951 pool = multiprocessing.Pool(processes=4)
1952 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1953
1954class _file_like(object):
1955 def __init__(self, delegate):
1956 self._delegate = delegate
1957 self._pid = None
1958
1959 @property
1960 def cache(self):
1961 pid = os.getpid()
1962 # There are no race conditions since fork keeps only the running thread
1963 if pid != self._pid:
1964 self._pid = pid
1965 self._cache = []
1966 return self._cache
1967
1968 def write(self, data):
1969 self.cache.append(data)
1970
1971 def flush(self):
1972 self._delegate.write(''.join(self.cache))
1973 self._cache = []
1974
1975class TestStdinBadfiledescriptor(unittest.TestCase):
1976
1977 def test_queue_in_process(self):
1978 queue = multiprocessing.Queue()
1979 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1980 proc.start()
1981 proc.join()
1982
1983 def test_pool_in_process(self):
1984 p = multiprocessing.Process(target=pool_in_process)
1985 p.start()
1986 p.join()
1987
1988 def test_flushing(self):
1989 sio = io.StringIO()
1990 flike = _file_like(sio)
1991 flike.write('foo')
1992 proc = multiprocessing.Process(target=lambda: flike.flush())
1993 flike.flush()
1994 assert sio.getvalue() == 'foo'
1995
1996testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1997 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001998
Benjamin Petersone711caf2008-06-11 16:44:04 +00001999#
2000#
2001#
2002
2003def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002004 if sys.platform.startswith("linux"):
2005 try:
2006 lock = multiprocessing.RLock()
2007 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002008 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002009
Benjamin Petersone711caf2008-06-11 16:44:04 +00002010 if run is None:
2011 from test.support import run_unittest as run
2012
2013 util.get_temp_dir() # creates temp directory for use by all processes
2014
2015 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2016
Benjamin Peterson41181742008-07-02 20:22:54 +00002017 ProcessesMixin.pool = multiprocessing.Pool(4)
2018 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2019 ManagerMixin.manager.__init__()
2020 ManagerMixin.manager.start()
2021 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002022
2023 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002024 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2025 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002026 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2027 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002028 )
2029
2030 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2031 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2032 run(suite)
2033
Benjamin Peterson41181742008-07-02 20:22:54 +00002034 ThreadsMixin.pool.terminate()
2035 ProcessesMixin.pool.terminate()
2036 ManagerMixin.pool.terminate()
2037 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002038
Benjamin Peterson41181742008-07-02 20:22:54 +00002039 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002040
2041def main():
2042 test_main(unittest.TextTestRunner(verbosity=2).run)
2043
2044if __name__ == '__main__':
2045 main()