blob: f1ccea658e6a8e09fb0608aee9ca3a159c2760c8 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000019import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
R. David Murraya21e4ca2009-03-31 23:16:50 +000022# Skip tests if _multiprocessing wasn't built.
23_multiprocessing = test.support.import_module('_multiprocessing')
24# Skip tests if sem_open implementation is broken.
25test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000026# import threading after _multiprocessing to raise a more revelant error
27# message: "No module named _multiprocessing". _multiprocessing is not compiled
28# without thread support.
29import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
Benjamin Petersone711caf2008-06-11 16:44:04 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37from multiprocessing import util
38
Brian Curtinafa88b52010-10-07 01:12:19 +000039try:
40 from multiprocessing.sharedctypes import Value, copy
41 HAS_SHAREDCTYPES = True
42except ImportError:
43 HAS_SHAREDCTYPES = False
44
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46#
47#
48
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000049def latin(s):
50 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Benjamin Petersone711caf2008-06-11 16:44:04 +000052#
53# Constants
54#
55
56LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000057#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59DELTA = 0.1
60CHECK_TIMINGS = False # making true makes tests take a lot longer
61 # and can sometimes cause some non-serious
62 # failures because some calls block a bit
63 # longer than expected
64if CHECK_TIMINGS:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
66else:
67 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
68
69HAVE_GETVALUE = not getattr(_multiprocessing,
70 'HAVE_BROKEN_SEM_GETVALUE', False)
71
Jesse Noller6214edd2009-01-19 16:23:53 +000072WIN32 = (sys.platform == "win32")
73
Benjamin Petersone711caf2008-06-11 16:44:04 +000074#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000075# Some tests require ctypes
76#
77
78try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000079 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000080except ImportError:
81 Structure = object
82 c_int = c_double = None
83
84#
Benjamin Petersone711caf2008-06-11 16:44:04 +000085# Creates a wrapper for a function which records the time it takes to finish
86#
87
88class TimingWrapper(object):
89
90 def __init__(self, func):
91 self.func = func
92 self.elapsed = None
93
94 def __call__(self, *args, **kwds):
95 t = time.time()
96 try:
97 return self.func(*args, **kwds)
98 finally:
99 self.elapsed = time.time() - t
100
101#
102# Base class for test cases
103#
104
105class BaseTestCase(object):
106
107 ALLOWED_TYPES = ('processes', 'manager', 'threads')
108
109 def assertTimingAlmostEqual(self, a, b):
110 if CHECK_TIMINGS:
111 self.assertAlmostEqual(a, b, 1)
112
113 def assertReturnsIfImplemented(self, value, func, *args):
114 try:
115 res = func(*args)
116 except NotImplementedError:
117 pass
118 else:
119 return self.assertEqual(value, res)
120
121#
122# Return the value of a semaphore
123#
124
125def get_value(self):
126 try:
127 return self.get_value()
128 except AttributeError:
129 try:
130 return self._Semaphore__value
131 except AttributeError:
132 try:
133 return self._value
134 except AttributeError:
135 raise NotImplementedError
136
137#
138# Testcases
139#
140
141class _TestProcess(BaseTestCase):
142
143 ALLOWED_TYPES = ('processes', 'threads')
144
145 def test_current(self):
146 if self.TYPE == 'threads':
147 return
148
149 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000150 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151
152 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000153 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000154 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000156 self.assertEqual(current.ident, os.getpid())
157 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158
159 def _test(self, q, *args, **kwds):
160 current = self.current_process()
161 q.put(args)
162 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000164 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000165 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 q.put(current.pid)
167
168 def test_process(self):
169 q = self.Queue(1)
170 e = self.Event()
171 args = (q, 1, 2)
172 kwargs = {'hello':23, 'bye':2.54}
173 name = 'SomeProcess'
174 p = self.Process(
175 target=self._test, args=args, kwargs=kwargs, name=name
176 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000177 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178 current = self.current_process()
179
180 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000183 self.assertEquals(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000184 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187
188 p.start()
189
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191 self.assertEquals(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
194 self.assertEquals(q.get(), args[1:])
195 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 self.assertEquals(q.get(), p.pid)
200
201 p.join()
202
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self.assertEquals(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000205 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
207 def _test_terminate(self):
208 time.sleep(1000)
209
210 def test_terminate(self):
211 if self.TYPE == 'threads':
212 return
213
214 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 p.start()
217
218 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000219 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000220 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221
222 p.terminate()
223
224 join = TimingWrapper(p.join)
225 self.assertEqual(join(), None)
226 self.assertTimingAlmostEqual(join.elapsed, 0.0)
227
228 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000229 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230
231 p.join()
232
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000233 # XXX sometimes get p.exitcode == 0 on Windows ...
234 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235
236 def test_cpu_count(self):
237 try:
238 cpus = multiprocessing.cpu_count()
239 except NotImplementedError:
240 cpus = 1
241 self.assertTrue(type(cpus) is int)
242 self.assertTrue(cpus >= 1)
243
244 def test_active_children(self):
245 self.assertEqual(type(self.active_children()), list)
246
247 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
250 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000251 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
253 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255
256 def _test_recursion(self, wconn, id):
257 from multiprocessing import forking
258 wconn.send(id)
259 if len(id) < 2:
260 for i in range(2):
261 p = self.Process(
262 target=self._test_recursion, args=(wconn, id+[i])
263 )
264 p.start()
265 p.join()
266
267 def test_recursion(self):
268 rconn, wconn = self.Pipe(duplex=False)
269 self._test_recursion(wconn, [])
270
271 time.sleep(DELTA)
272 result = []
273 while rconn.poll():
274 result.append(rconn.recv())
275
276 expected = [
277 [],
278 [0],
279 [0, 0],
280 [0, 1],
281 [1],
282 [1, 0],
283 [1, 1]
284 ]
285 self.assertEqual(result, expected)
286
287#
288#
289#
290
291class _UpperCaser(multiprocessing.Process):
292
293 def __init__(self):
294 multiprocessing.Process.__init__(self)
295 self.child_conn, self.parent_conn = multiprocessing.Pipe()
296
297 def run(self):
298 self.parent_conn.close()
299 for s in iter(self.child_conn.recv, None):
300 self.child_conn.send(s.upper())
301 self.child_conn.close()
302
303 def submit(self, s):
304 assert type(s) is str
305 self.parent_conn.send(s)
306 return self.parent_conn.recv()
307
308 def stop(self):
309 self.parent_conn.send(None)
310 self.parent_conn.close()
311 self.child_conn.close()
312
313class _TestSubclassingProcess(BaseTestCase):
314
315 ALLOWED_TYPES = ('processes',)
316
317 def test_subclassing(self):
318 uppercaser = _UpperCaser()
319 uppercaser.start()
320 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
321 self.assertEqual(uppercaser.submit('world'), 'WORLD')
322 uppercaser.stop()
323 uppercaser.join()
324
325#
326#
327#
328
329def queue_empty(q):
330 if hasattr(q, 'empty'):
331 return q.empty()
332 else:
333 return q.qsize() == 0
334
335def queue_full(q, maxsize):
336 if hasattr(q, 'full'):
337 return q.full()
338 else:
339 return q.qsize() == maxsize
340
341
342class _TestQueue(BaseTestCase):
343
344
345 def _test_put(self, queue, child_can_start, parent_can_continue):
346 child_can_start.wait()
347 for i in range(6):
348 queue.get()
349 parent_can_continue.set()
350
351 def test_put(self):
352 MAXSIZE = 6
353 queue = self.Queue(maxsize=MAXSIZE)
354 child_can_start = self.Event()
355 parent_can_continue = self.Event()
356
357 proc = self.Process(
358 target=self._test_put,
359 args=(queue, child_can_start, parent_can_continue)
360 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000361 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362 proc.start()
363
364 self.assertEqual(queue_empty(queue), True)
365 self.assertEqual(queue_full(queue, MAXSIZE), False)
366
367 queue.put(1)
368 queue.put(2, True)
369 queue.put(3, True, None)
370 queue.put(4, False)
371 queue.put(5, False, None)
372 queue.put_nowait(6)
373
374 # the values may be in buffer but not yet in pipe so sleep a bit
375 time.sleep(DELTA)
376
377 self.assertEqual(queue_empty(queue), False)
378 self.assertEqual(queue_full(queue, MAXSIZE), True)
379
380 put = TimingWrapper(queue.put)
381 put_nowait = TimingWrapper(queue.put_nowait)
382
383 self.assertRaises(pyqueue.Full, put, 7, False)
384 self.assertTimingAlmostEqual(put.elapsed, 0)
385
386 self.assertRaises(pyqueue.Full, put, 7, False, None)
387 self.assertTimingAlmostEqual(put.elapsed, 0)
388
389 self.assertRaises(pyqueue.Full, put_nowait, 7)
390 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
391
392 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
393 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
394
395 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
399 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
400
401 child_can_start.set()
402 parent_can_continue.wait()
403
404 self.assertEqual(queue_empty(queue), True)
405 self.assertEqual(queue_full(queue, MAXSIZE), False)
406
407 proc.join()
408
409 def _test_get(self, queue, child_can_start, parent_can_continue):
410 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000411 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000412 queue.put(2)
413 queue.put(3)
414 queue.put(4)
415 queue.put(5)
416 parent_can_continue.set()
417
418 def test_get(self):
419 queue = self.Queue()
420 child_can_start = self.Event()
421 parent_can_continue = self.Event()
422
423 proc = self.Process(
424 target=self._test_get,
425 args=(queue, child_can_start, parent_can_continue)
426 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000427 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000428 proc.start()
429
430 self.assertEqual(queue_empty(queue), True)
431
432 child_can_start.set()
433 parent_can_continue.wait()
434
435 time.sleep(DELTA)
436 self.assertEqual(queue_empty(queue), False)
437
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000438 # Hangs unexpectedly, remove for now
439 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440 self.assertEqual(queue.get(True, None), 2)
441 self.assertEqual(queue.get(True), 3)
442 self.assertEqual(queue.get(timeout=1), 4)
443 self.assertEqual(queue.get_nowait(), 5)
444
445 self.assertEqual(queue_empty(queue), True)
446
447 get = TimingWrapper(queue.get)
448 get_nowait = TimingWrapper(queue.get_nowait)
449
450 self.assertRaises(pyqueue.Empty, get, False)
451 self.assertTimingAlmostEqual(get.elapsed, 0)
452
453 self.assertRaises(pyqueue.Empty, get, False, None)
454 self.assertTimingAlmostEqual(get.elapsed, 0)
455
456 self.assertRaises(pyqueue.Empty, get_nowait)
457 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
458
459 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
460 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
461
462 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
466 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
467
468 proc.join()
469
470 def _test_fork(self, queue):
471 for i in range(10, 20):
472 queue.put(i)
473 # note that at this point the items may only be buffered, so the
474 # process cannot shutdown until the feeder thread has finished
475 # pushing items onto the pipe.
476
477 def test_fork(self):
478 # Old versions of Queue would fail to create a new feeder
479 # thread for a forked process if the original process had its
480 # own feeder thread. This test checks that this no longer
481 # happens.
482
483 queue = self.Queue()
484
485 # put items on queue so that main process starts a feeder thread
486 for i in range(10):
487 queue.put(i)
488
489 # wait to make sure thread starts before we fork a new process
490 time.sleep(DELTA)
491
492 # fork process
493 p = self.Process(target=self._test_fork, args=(queue,))
494 p.start()
495
496 # check that all expected items are in the queue
497 for i in range(20):
498 self.assertEqual(queue.get(), i)
499 self.assertRaises(pyqueue.Empty, queue.get, False)
500
501 p.join()
502
503 def test_qsize(self):
504 q = self.Queue()
505 try:
506 self.assertEqual(q.qsize(), 0)
507 except NotImplementedError:
508 return
509 q.put(1)
510 self.assertEqual(q.qsize(), 1)
511 q.put(5)
512 self.assertEqual(q.qsize(), 2)
513 q.get()
514 self.assertEqual(q.qsize(), 1)
515 q.get()
516 self.assertEqual(q.qsize(), 0)
517
518 def _test_task_done(self, q):
519 for obj in iter(q.get, None):
520 time.sleep(DELTA)
521 q.task_done()
522
523 def test_task_done(self):
524 queue = self.JoinableQueue()
525
526 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000527 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000528
529 workers = [self.Process(target=self._test_task_done, args=(queue,))
530 for i in range(4)]
531
532 for p in workers:
533 p.start()
534
535 for i in range(10):
536 queue.put(i)
537
538 queue.join()
539
540 for p in workers:
541 queue.put(None)
542
543 for p in workers:
544 p.join()
545
546#
547#
548#
549
550class _TestLock(BaseTestCase):
551
552 def test_lock(self):
553 lock = self.Lock()
554 self.assertEqual(lock.acquire(), True)
555 self.assertEqual(lock.acquire(False), False)
556 self.assertEqual(lock.release(), None)
557 self.assertRaises((ValueError, threading.ThreadError), lock.release)
558
559 def test_rlock(self):
560 lock = self.RLock()
561 self.assertEqual(lock.acquire(), True)
562 self.assertEqual(lock.acquire(), True)
563 self.assertEqual(lock.acquire(), True)
564 self.assertEqual(lock.release(), None)
565 self.assertEqual(lock.release(), None)
566 self.assertEqual(lock.release(), None)
567 self.assertRaises((AssertionError, RuntimeError), lock.release)
568
Jesse Nollerf8d00852009-03-31 03:25:07 +0000569 def test_lock_context(self):
570 with self.Lock():
571 pass
572
Benjamin Petersone711caf2008-06-11 16:44:04 +0000573
574class _TestSemaphore(BaseTestCase):
575
576 def _test_semaphore(self, sem):
577 self.assertReturnsIfImplemented(2, get_value, sem)
578 self.assertEqual(sem.acquire(), True)
579 self.assertReturnsIfImplemented(1, get_value, sem)
580 self.assertEqual(sem.acquire(), True)
581 self.assertReturnsIfImplemented(0, get_value, sem)
582 self.assertEqual(sem.acquire(False), False)
583 self.assertReturnsIfImplemented(0, get_value, sem)
584 self.assertEqual(sem.release(), None)
585 self.assertReturnsIfImplemented(1, get_value, sem)
586 self.assertEqual(sem.release(), None)
587 self.assertReturnsIfImplemented(2, get_value, sem)
588
589 def test_semaphore(self):
590 sem = self.Semaphore(2)
591 self._test_semaphore(sem)
592 self.assertEqual(sem.release(), None)
593 self.assertReturnsIfImplemented(3, get_value, sem)
594 self.assertEqual(sem.release(), None)
595 self.assertReturnsIfImplemented(4, get_value, sem)
596
597 def test_bounded_semaphore(self):
598 sem = self.BoundedSemaphore(2)
599 self._test_semaphore(sem)
600 # Currently fails on OS/X
601 #if HAVE_GETVALUE:
602 # self.assertRaises(ValueError, sem.release)
603 # self.assertReturnsIfImplemented(2, get_value, sem)
604
605 def test_timeout(self):
606 if self.TYPE != 'processes':
607 return
608
609 sem = self.Semaphore(0)
610 acquire = TimingWrapper(sem.acquire)
611
612 self.assertEqual(acquire(False), False)
613 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
614
615 self.assertEqual(acquire(False, None), False)
616 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
617
618 self.assertEqual(acquire(False, TIMEOUT1), False)
619 self.assertTimingAlmostEqual(acquire.elapsed, 0)
620
621 self.assertEqual(acquire(True, TIMEOUT2), False)
622 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
623
624 self.assertEqual(acquire(timeout=TIMEOUT3), False)
625 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
626
627
628class _TestCondition(BaseTestCase):
629
630 def f(self, cond, sleeping, woken, timeout=None):
631 cond.acquire()
632 sleeping.release()
633 cond.wait(timeout)
634 woken.release()
635 cond.release()
636
637 def check_invariant(self, cond):
638 # this is only supposed to succeed when there are no sleepers
639 if self.TYPE == 'processes':
640 try:
641 sleepers = (cond._sleeping_count.get_value() -
642 cond._woken_count.get_value())
643 self.assertEqual(sleepers, 0)
644 self.assertEqual(cond._wait_semaphore.get_value(), 0)
645 except NotImplementedError:
646 pass
647
648 def test_notify(self):
649 cond = self.Condition()
650 sleeping = self.Semaphore(0)
651 woken = self.Semaphore(0)
652
653 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000654 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000655 p.start()
656
657 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000658 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000659 p.start()
660
661 # wait for both children to start sleeping
662 sleeping.acquire()
663 sleeping.acquire()
664
665 # check no process/thread has woken up
666 time.sleep(DELTA)
667 self.assertReturnsIfImplemented(0, get_value, woken)
668
669 # wake up one process/thread
670 cond.acquire()
671 cond.notify()
672 cond.release()
673
674 # check one process/thread has woken up
675 time.sleep(DELTA)
676 self.assertReturnsIfImplemented(1, get_value, woken)
677
678 # wake up another
679 cond.acquire()
680 cond.notify()
681 cond.release()
682
683 # check other has woken up
684 time.sleep(DELTA)
685 self.assertReturnsIfImplemented(2, get_value, woken)
686
687 # check state is not mucked up
688 self.check_invariant(cond)
689 p.join()
690
691 def test_notify_all(self):
692 cond = self.Condition()
693 sleeping = self.Semaphore(0)
694 woken = self.Semaphore(0)
695
696 # start some threads/processes which will timeout
697 for i in range(3):
698 p = self.Process(target=self.f,
699 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000700 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701 p.start()
702
703 t = threading.Thread(target=self.f,
704 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000705 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 t.start()
707
708 # wait for them all to sleep
709 for i in range(6):
710 sleeping.acquire()
711
712 # check they have all timed out
713 for i in range(6):
714 woken.acquire()
715 self.assertReturnsIfImplemented(0, get_value, woken)
716
717 # check state is not mucked up
718 self.check_invariant(cond)
719
720 # start some more threads/processes
721 for i in range(3):
722 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000723 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000724 p.start()
725
726 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000727 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000728 t.start()
729
730 # wait for them to all sleep
731 for i in range(6):
732 sleeping.acquire()
733
734 # check no process/thread has woken up
735 time.sleep(DELTA)
736 self.assertReturnsIfImplemented(0, get_value, woken)
737
738 # wake them all up
739 cond.acquire()
740 cond.notify_all()
741 cond.release()
742
743 # check they have all woken
744 time.sleep(DELTA)
745 self.assertReturnsIfImplemented(6, get_value, woken)
746
747 # check state is not mucked up
748 self.check_invariant(cond)
749
750 def test_timeout(self):
751 cond = self.Condition()
752 wait = TimingWrapper(cond.wait)
753 cond.acquire()
754 res = wait(TIMEOUT1)
755 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000756 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000757 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
758
759
760class _TestEvent(BaseTestCase):
761
762 def _test_event(self, event):
763 time.sleep(TIMEOUT2)
764 event.set()
765
766 def test_event(self):
767 event = self.Event()
768 wait = TimingWrapper(event.wait)
769
770 # Removed temporaily, due to API shear, this does not
771 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000772 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000773
Benjamin Peterson965ce872009-04-05 21:24:58 +0000774 # Removed, threading.Event.wait() will return the value of the __flag
775 # instead of None. API Shear with the semaphore backed mp.Event
776 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000778 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
780
781 event.set()
782
783 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000784 self.assertEqual(event.is_set(), True)
785 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000787 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
789 # self.assertEqual(event.is_set(), True)
790
791 event.clear()
792
793 #self.assertEqual(event.is_set(), False)
794
795 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000796 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000797
798#
799#
800#
801
Brian Curtinafa88b52010-10-07 01:12:19 +0000802@unittest.skipUnless(HAS_SHAREDCTYPES,
803 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804class _TestValue(BaseTestCase):
805
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000806 ALLOWED_TYPES = ('processes',)
807
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808 codes_values = [
809 ('i', 4343, 24234),
810 ('d', 3.625, -4.25),
811 ('h', -232, 234),
812 ('c', latin('x'), latin('y'))
813 ]
814
815 def _test(self, values):
816 for sv, cv in zip(values, self.codes_values):
817 sv.value = cv[2]
818
819
820 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000821 if raw:
822 values = [self.RawValue(code, value)
823 for code, value, _ in self.codes_values]
824 else:
825 values = [self.Value(code, value)
826 for code, value, _ in self.codes_values]
827
828 for sv, cv in zip(values, self.codes_values):
829 self.assertEqual(sv.value, cv[1])
830
831 proc = self.Process(target=self._test, args=(values,))
832 proc.start()
833 proc.join()
834
835 for sv, cv in zip(values, self.codes_values):
836 self.assertEqual(sv.value, cv[2])
837
838 def test_rawvalue(self):
839 self.test_value(raw=True)
840
841 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000842 val1 = self.Value('i', 5)
843 lock1 = val1.get_lock()
844 obj1 = val1.get_obj()
845
846 val2 = self.Value('i', 5, lock=None)
847 lock2 = val2.get_lock()
848 obj2 = val2.get_obj()
849
850 lock = self.Lock()
851 val3 = self.Value('i', 5, lock=lock)
852 lock3 = val3.get_lock()
853 obj3 = val3.get_obj()
854 self.assertEqual(lock, lock3)
855
Jesse Nollerb0516a62009-01-18 03:11:38 +0000856 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000857 self.assertFalse(hasattr(arr4, 'get_lock'))
858 self.assertFalse(hasattr(arr4, 'get_obj'))
859
Jesse Nollerb0516a62009-01-18 03:11:38 +0000860 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
861
862 arr5 = self.RawValue('i', 5)
863 self.assertFalse(hasattr(arr5, 'get_lock'))
864 self.assertFalse(hasattr(arr5, 'get_obj'))
865
Benjamin Petersone711caf2008-06-11 16:44:04 +0000866
867class _TestArray(BaseTestCase):
868
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000869 ALLOWED_TYPES = ('processes',)
870
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 def f(self, seq):
872 for i in range(1, len(seq)):
873 seq[i] += seq[i-1]
874
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000875 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000877 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
878 if raw:
879 arr = self.RawArray('i', seq)
880 else:
881 arr = self.Array('i', seq)
882
883 self.assertEqual(len(arr), len(seq))
884 self.assertEqual(arr[3], seq[3])
885 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
886
887 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
888
889 self.assertEqual(list(arr[:]), seq)
890
891 self.f(seq)
892
893 p = self.Process(target=self.f, args=(arr,))
894 p.start()
895 p.join()
896
897 self.assertEqual(list(arr[:]), seq)
898
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000899 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 def test_rawarray(self):
901 self.test_array(raw=True)
902
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000903 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905 arr1 = self.Array('i', list(range(10)))
906 lock1 = arr1.get_lock()
907 obj1 = arr1.get_obj()
908
909 arr2 = self.Array('i', list(range(10)), lock=None)
910 lock2 = arr2.get_lock()
911 obj2 = arr2.get_obj()
912
913 lock = self.Lock()
914 arr3 = self.Array('i', list(range(10)), lock=lock)
915 lock3 = arr3.get_lock()
916 obj3 = arr3.get_obj()
917 self.assertEqual(lock, lock3)
918
Jesse Nollerb0516a62009-01-18 03:11:38 +0000919 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertFalse(hasattr(arr4, 'get_lock'))
921 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000922 self.assertRaises(AttributeError,
923 self.Array, 'i', range(10), lock='notalock')
924
925 arr5 = self.RawArray('i', range(10))
926 self.assertFalse(hasattr(arr5, 'get_lock'))
927 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000928
929#
930#
931#
932
933class _TestContainers(BaseTestCase):
934
935 ALLOWED_TYPES = ('manager',)
936
937 def test_list(self):
938 a = self.list(list(range(10)))
939 self.assertEqual(a[:], list(range(10)))
940
941 b = self.list()
942 self.assertEqual(b[:], [])
943
944 b.extend(list(range(5)))
945 self.assertEqual(b[:], list(range(5)))
946
947 self.assertEqual(b[2], 2)
948 self.assertEqual(b[2:10], [2,3,4])
949
950 b *= 2
951 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
952
953 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
954
955 self.assertEqual(a[:], list(range(10)))
956
957 d = [a, b]
958 e = self.list(d)
959 self.assertEqual(
960 e[:],
961 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
962 )
963
964 f = self.list([a])
965 a.append('hello')
966 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
967
968 def test_dict(self):
969 d = self.dict()
970 indices = list(range(65, 70))
971 for i in indices:
972 d[i] = chr(i)
973 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
974 self.assertEqual(sorted(d.keys()), indices)
975 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
976 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
977
978 def test_namespace(self):
979 n = self.Namespace()
980 n.name = 'Bob'
981 n.job = 'Builder'
982 n._hidden = 'hidden'
983 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
984 del n.job
985 self.assertEqual(str(n), "Namespace(name='Bob')")
986 self.assertTrue(hasattr(n, 'name'))
987 self.assertTrue(not hasattr(n, 'job'))
988
989#
990#
991#
992
993def sqr(x, wait=0.0):
994 time.sleep(wait)
995 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996class _TestPool(BaseTestCase):
997
998 def test_apply(self):
999 papply = self.pool.apply
1000 self.assertEqual(papply(sqr, (5,)), sqr(5))
1001 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1002
1003 def test_map(self):
1004 pmap = self.pool.map
1005 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1006 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1007 list(map(sqr, list(range(100)))))
1008
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001009 def test_map_chunksize(self):
1010 try:
1011 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1012 except multiprocessing.TimeoutError:
1013 self.fail("pool.map_async with chunksize stalled on null list")
1014
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 def test_async(self):
1016 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1017 get = TimingWrapper(res.get)
1018 self.assertEqual(get(), 49)
1019 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1020
1021 def test_async_timeout(self):
1022 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1023 get = TimingWrapper(res.get)
1024 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1025 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1026
1027 def test_imap(self):
1028 it = self.pool.imap(sqr, list(range(10)))
1029 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1030
1031 it = self.pool.imap(sqr, list(range(10)))
1032 for i in range(10):
1033 self.assertEqual(next(it), i*i)
1034 self.assertRaises(StopIteration, it.__next__)
1035
1036 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1037 for i in range(1000):
1038 self.assertEqual(next(it), i*i)
1039 self.assertRaises(StopIteration, it.__next__)
1040
1041 def test_imap_unordered(self):
1042 it = self.pool.imap_unordered(sqr, list(range(1000)))
1043 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1044
1045 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1046 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1047
1048 def test_make_pool(self):
1049 p = multiprocessing.Pool(3)
1050 self.assertEqual(3, len(p._pool))
1051 p.close()
1052 p.join()
1053
1054 def test_terminate(self):
1055 if self.TYPE == 'manager':
1056 # On Unix a forked process increfs each shared object to
1057 # which its parent process held a reference. If the
1058 # forked process gets terminated then there is likely to
1059 # be a reference leak. So to prevent
1060 # _TestZZZNumberOfObjects from failing we skip this test
1061 # when using a manager.
1062 return
1063
1064 result = self.pool.map_async(
1065 time.sleep, [0.1 for i in range(10000)], chunksize=1
1066 )
1067 self.pool.terminate()
1068 join = TimingWrapper(self.pool.join)
1069 join()
1070 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001071
1072class _TestPoolWorkerLifetime(BaseTestCase):
1073
1074 ALLOWED_TYPES = ('processes', )
1075 def test_pool_worker_lifetime(self):
1076 p = multiprocessing.Pool(3, maxtasksperchild=10)
1077 self.assertEqual(3, len(p._pool))
1078 origworkerpids = [w.pid for w in p._pool]
1079 # Run many tasks so each worker gets replaced (hopefully)
1080 results = []
1081 for i in range(100):
1082 results.append(p.apply_async(sqr, (i, )))
1083 # Fetch the results and verify we got the right answers,
1084 # also ensuring all the tasks have completed.
1085 for (j, res) in enumerate(results):
1086 self.assertEqual(res.get(), sqr(j))
1087 # Refill the pool
1088 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001089 # Wait until all workers are alive
1090 countdown = 5
1091 while countdown and not all(w.is_alive() for w in p._pool):
1092 countdown -= 1
1093 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001094 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001095 # All pids should be assigned. See issue #7805.
1096 self.assertNotIn(None, origworkerpids)
1097 self.assertNotIn(None, finalworkerpids)
1098 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001099 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1100 p.close()
1101 p.join()
1102
Benjamin Petersone711caf2008-06-11 16:44:04 +00001103#
1104# Test that manager has expected number of shared objects left
1105#
1106
1107class _TestZZZNumberOfObjects(BaseTestCase):
1108 # Because test cases are sorted alphabetically, this one will get
1109 # run after all the other tests for the manager. It tests that
1110 # there have been no "reference leaks" for the manager's shared
1111 # objects. Note the comment in _TestPool.test_terminate().
1112 ALLOWED_TYPES = ('manager',)
1113
1114 def test_number_of_objects(self):
1115 EXPECTED_NUMBER = 1 # the pool object is still alive
1116 multiprocessing.active_children() # discard dead process objs
1117 gc.collect() # do garbage collection
1118 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001119 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001120 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001121 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001122 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001123
1124 self.assertEqual(refs, EXPECTED_NUMBER)
1125
1126#
1127# Test of creating a customized manager class
1128#
1129
1130from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1131
1132class FooBar(object):
1133 def f(self):
1134 return 'f()'
1135 def g(self):
1136 raise ValueError
1137 def _h(self):
1138 return '_h()'
1139
1140def baz():
1141 for i in range(10):
1142 yield i*i
1143
1144class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001145 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001146 def __iter__(self):
1147 return self
1148 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001149 return self._callmethod('__next__')
1150
1151class MyManager(BaseManager):
1152 pass
1153
1154MyManager.register('Foo', callable=FooBar)
1155MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1156MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1157
1158
1159class _TestMyManager(BaseTestCase):
1160
1161 ALLOWED_TYPES = ('manager',)
1162
1163 def test_mymanager(self):
1164 manager = MyManager()
1165 manager.start()
1166
1167 foo = manager.Foo()
1168 bar = manager.Bar()
1169 baz = manager.baz()
1170
1171 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1172 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1173
1174 self.assertEqual(foo_methods, ['f', 'g'])
1175 self.assertEqual(bar_methods, ['f', '_h'])
1176
1177 self.assertEqual(foo.f(), 'f()')
1178 self.assertRaises(ValueError, foo.g)
1179 self.assertEqual(foo._callmethod('f'), 'f()')
1180 self.assertRaises(RemoteError, foo._callmethod, '_h')
1181
1182 self.assertEqual(bar.f(), 'f()')
1183 self.assertEqual(bar._h(), '_h()')
1184 self.assertEqual(bar._callmethod('f'), 'f()')
1185 self.assertEqual(bar._callmethod('_h'), '_h()')
1186
1187 self.assertEqual(list(baz), [i*i for i in range(10)])
1188
1189 manager.shutdown()
1190
1191#
1192# Test of connecting to a remote server and using xmlrpclib for serialization
1193#
1194
1195_queue = pyqueue.Queue()
1196def get_queue():
1197 return _queue
1198
1199class QueueManager(BaseManager):
1200 '''manager class used by server process'''
1201QueueManager.register('get_queue', callable=get_queue)
1202
1203class QueueManager2(BaseManager):
1204 '''manager class which specifies the same interface as QueueManager'''
1205QueueManager2.register('get_queue')
1206
1207
1208SERIALIZER = 'xmlrpclib'
1209
1210class _TestRemoteManager(BaseTestCase):
1211
1212 ALLOWED_TYPES = ('manager',)
1213
1214 def _putter(self, address, authkey):
1215 manager = QueueManager2(
1216 address=address, authkey=authkey, serializer=SERIALIZER
1217 )
1218 manager.connect()
1219 queue = manager.get_queue()
1220 queue.put(('hello world', None, True, 2.25))
1221
1222 def test_remote(self):
1223 authkey = os.urandom(32)
1224
1225 manager = QueueManager(
1226 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1227 )
1228 manager.start()
1229
1230 p = self.Process(target=self._putter, args=(manager.address, authkey))
1231 p.start()
1232
1233 manager2 = QueueManager2(
1234 address=manager.address, authkey=authkey, serializer=SERIALIZER
1235 )
1236 manager2.connect()
1237 queue = manager2.get_queue()
1238
1239 # Note that xmlrpclib will deserialize object as a list not a tuple
1240 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1241
1242 # Because we are using xmlrpclib for serialization instead of
1243 # pickle this will cause a serialization error.
1244 self.assertRaises(Exception, queue.put, time.sleep)
1245
1246 # Make queue finalizer run before the server is stopped
1247 del queue
1248 manager.shutdown()
1249
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001250class _TestManagerRestart(BaseTestCase):
1251
1252 def _putter(self, address, authkey):
1253 manager = QueueManager(
1254 address=address, authkey=authkey, serializer=SERIALIZER)
1255 manager.connect()
1256 queue = manager.get_queue()
1257 queue.put('hello world')
1258
1259 def test_rapid_restart(self):
1260 authkey = os.urandom(32)
1261 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001262 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001263 srvr = manager.get_server()
1264 addr = srvr.address
1265 # Close the connection.Listener socket which gets opened as a part
1266 # of manager.get_server(). It's not needed for the test.
1267 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001268 manager.start()
1269
1270 p = self.Process(target=self._putter, args=(manager.address, authkey))
1271 p.start()
1272 queue = manager.get_queue()
1273 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001274 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001275 manager.shutdown()
1276 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001277 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001278 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001279 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001280
Benjamin Petersone711caf2008-06-11 16:44:04 +00001281#
1282#
1283#
1284
1285SENTINEL = latin('')
1286
1287class _TestConnection(BaseTestCase):
1288
1289 ALLOWED_TYPES = ('processes', 'threads')
1290
1291 def _echo(self, conn):
1292 for msg in iter(conn.recv_bytes, SENTINEL):
1293 conn.send_bytes(msg)
1294 conn.close()
1295
1296 def test_connection(self):
1297 conn, child_conn = self.Pipe()
1298
1299 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001300 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001301 p.start()
1302
1303 seq = [1, 2.25, None]
1304 msg = latin('hello world')
1305 longmsg = msg * 10
1306 arr = array.array('i', list(range(4)))
1307
1308 if self.TYPE == 'processes':
1309 self.assertEqual(type(conn.fileno()), int)
1310
1311 self.assertEqual(conn.send(seq), None)
1312 self.assertEqual(conn.recv(), seq)
1313
1314 self.assertEqual(conn.send_bytes(msg), None)
1315 self.assertEqual(conn.recv_bytes(), msg)
1316
1317 if self.TYPE == 'processes':
1318 buffer = array.array('i', [0]*10)
1319 expected = list(arr) + [0] * (10 - len(arr))
1320 self.assertEqual(conn.send_bytes(arr), None)
1321 self.assertEqual(conn.recv_bytes_into(buffer),
1322 len(arr) * buffer.itemsize)
1323 self.assertEqual(list(buffer), expected)
1324
1325 buffer = array.array('i', [0]*10)
1326 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1327 self.assertEqual(conn.send_bytes(arr), None)
1328 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1329 len(arr) * buffer.itemsize)
1330 self.assertEqual(list(buffer), expected)
1331
1332 buffer = bytearray(latin(' ' * 40))
1333 self.assertEqual(conn.send_bytes(longmsg), None)
1334 try:
1335 res = conn.recv_bytes_into(buffer)
1336 except multiprocessing.BufferTooShort as e:
1337 self.assertEqual(e.args, (longmsg,))
1338 else:
1339 self.fail('expected BufferTooShort, got %s' % res)
1340
1341 poll = TimingWrapper(conn.poll)
1342
1343 self.assertEqual(poll(), False)
1344 self.assertTimingAlmostEqual(poll.elapsed, 0)
1345
1346 self.assertEqual(poll(TIMEOUT1), False)
1347 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1348
1349 conn.send(None)
1350
1351 self.assertEqual(poll(TIMEOUT1), True)
1352 self.assertTimingAlmostEqual(poll.elapsed, 0)
1353
1354 self.assertEqual(conn.recv(), None)
1355
1356 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1357 conn.send_bytes(really_big_msg)
1358 self.assertEqual(conn.recv_bytes(), really_big_msg)
1359
1360 conn.send_bytes(SENTINEL) # tell child to quit
1361 child_conn.close()
1362
1363 if self.TYPE == 'processes':
1364 self.assertEqual(conn.readable, True)
1365 self.assertEqual(conn.writable, True)
1366 self.assertRaises(EOFError, conn.recv)
1367 self.assertRaises(EOFError, conn.recv_bytes)
1368
1369 p.join()
1370
1371 def test_duplex_false(self):
1372 reader, writer = self.Pipe(duplex=False)
1373 self.assertEqual(writer.send(1), None)
1374 self.assertEqual(reader.recv(), 1)
1375 if self.TYPE == 'processes':
1376 self.assertEqual(reader.readable, True)
1377 self.assertEqual(reader.writable, False)
1378 self.assertEqual(writer.readable, False)
1379 self.assertEqual(writer.writable, True)
1380 self.assertRaises(IOError, reader.send, 2)
1381 self.assertRaises(IOError, writer.recv)
1382 self.assertRaises(IOError, writer.poll)
1383
1384 def test_spawn_close(self):
1385 # We test that a pipe connection can be closed by parent
1386 # process immediately after child is spawned. On Windows this
1387 # would have sometimes failed on old versions because
1388 # child_conn would be closed before the child got a chance to
1389 # duplicate it.
1390 conn, child_conn = self.Pipe()
1391
1392 p = self.Process(target=self._echo, args=(child_conn,))
1393 p.start()
1394 child_conn.close() # this might complete before child initializes
1395
1396 msg = latin('hello')
1397 conn.send_bytes(msg)
1398 self.assertEqual(conn.recv_bytes(), msg)
1399
1400 conn.send_bytes(SENTINEL)
1401 conn.close()
1402 p.join()
1403
1404 def test_sendbytes(self):
1405 if self.TYPE != 'processes':
1406 return
1407
1408 msg = latin('abcdefghijklmnopqrstuvwxyz')
1409 a, b = self.Pipe()
1410
1411 a.send_bytes(msg)
1412 self.assertEqual(b.recv_bytes(), msg)
1413
1414 a.send_bytes(msg, 5)
1415 self.assertEqual(b.recv_bytes(), msg[5:])
1416
1417 a.send_bytes(msg, 7, 8)
1418 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1419
1420 a.send_bytes(msg, 26)
1421 self.assertEqual(b.recv_bytes(), latin(''))
1422
1423 a.send_bytes(msg, 26, 0)
1424 self.assertEqual(b.recv_bytes(), latin(''))
1425
1426 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1427
1428 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1429
1430 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1431
1432 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1433
1434 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1435
Benjamin Petersone711caf2008-06-11 16:44:04 +00001436class _TestListenerClient(BaseTestCase):
1437
1438 ALLOWED_TYPES = ('processes', 'threads')
1439
1440 def _test(self, address):
1441 conn = self.connection.Client(address)
1442 conn.send('hello')
1443 conn.close()
1444
1445 def test_listener_client(self):
1446 for family in self.connection.families:
1447 l = self.connection.Listener(family=family)
1448 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001449 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001450 p.start()
1451 conn = l.accept()
1452 self.assertEqual(conn.recv(), 'hello')
1453 p.join()
1454 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001455#
1456# Test of sending connection and socket objects between processes
1457#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001458"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001459class _TestPicklingConnections(BaseTestCase):
1460
1461 ALLOWED_TYPES = ('processes',)
1462
1463 def _listener(self, conn, families):
1464 for fam in families:
1465 l = self.connection.Listener(family=fam)
1466 conn.send(l.address)
1467 new_conn = l.accept()
1468 conn.send(new_conn)
1469
1470 if self.TYPE == 'processes':
1471 l = socket.socket()
1472 l.bind(('localhost', 0))
1473 conn.send(l.getsockname())
1474 l.listen(1)
1475 new_conn, addr = l.accept()
1476 conn.send(new_conn)
1477
1478 conn.recv()
1479
1480 def _remote(self, conn):
1481 for (address, msg) in iter(conn.recv, None):
1482 client = self.connection.Client(address)
1483 client.send(msg.upper())
1484 client.close()
1485
1486 if self.TYPE == 'processes':
1487 address, msg = conn.recv()
1488 client = socket.socket()
1489 client.connect(address)
1490 client.sendall(msg.upper())
1491 client.close()
1492
1493 conn.close()
1494
1495 def test_pickling(self):
1496 try:
1497 multiprocessing.allow_connection_pickling()
1498 except ImportError:
1499 return
1500
1501 families = self.connection.families
1502
1503 lconn, lconn0 = self.Pipe()
1504 lp = self.Process(target=self._listener, args=(lconn0, families))
1505 lp.start()
1506 lconn0.close()
1507
1508 rconn, rconn0 = self.Pipe()
1509 rp = self.Process(target=self._remote, args=(rconn0,))
1510 rp.start()
1511 rconn0.close()
1512
1513 for fam in families:
1514 msg = ('This connection uses family %s' % fam).encode('ascii')
1515 address = lconn.recv()
1516 rconn.send((address, msg))
1517 new_conn = lconn.recv()
1518 self.assertEqual(new_conn.recv(), msg.upper())
1519
1520 rconn.send(None)
1521
1522 if self.TYPE == 'processes':
1523 msg = latin('This connection uses a normal socket')
1524 address = lconn.recv()
1525 rconn.send((address, msg))
1526 if hasattr(socket, 'fromfd'):
1527 new_conn = lconn.recv()
1528 self.assertEqual(new_conn.recv(100), msg.upper())
1529 else:
1530 # XXX On Windows with Py2.6 need to backport fromfd()
1531 discard = lconn.recv_bytes()
1532
1533 lconn.send(None)
1534
1535 rconn.close()
1536 lconn.close()
1537
1538 lp.join()
1539 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001540"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001541#
1542#
1543#
1544
1545class _TestHeap(BaseTestCase):
1546
1547 ALLOWED_TYPES = ('processes',)
1548
1549 def test_heap(self):
1550 iterations = 5000
1551 maxblocks = 50
1552 blocks = []
1553
1554 # create and destroy lots of blocks of different sizes
1555 for i in range(iterations):
1556 size = int(random.lognormvariate(0, 1) * 1000)
1557 b = multiprocessing.heap.BufferWrapper(size)
1558 blocks.append(b)
1559 if len(blocks) > maxblocks:
1560 i = random.randrange(maxblocks)
1561 del blocks[i]
1562
1563 # get the heap object
1564 heap = multiprocessing.heap.BufferWrapper._heap
1565
1566 # verify the state of the heap
1567 all = []
1568 occupied = 0
1569 for L in list(heap._len_to_seq.values()):
1570 for arena, start, stop in L:
1571 all.append((heap._arenas.index(arena), start, stop,
1572 stop-start, 'free'))
1573 for arena, start, stop in heap._allocated_blocks:
1574 all.append((heap._arenas.index(arena), start, stop,
1575 stop-start, 'occupied'))
1576 occupied += (stop-start)
1577
1578 all.sort()
1579
1580 for i in range(len(all)-1):
1581 (arena, start, stop) = all[i][:3]
1582 (narena, nstart, nstop) = all[i+1][:3]
1583 self.assertTrue((arena != narena and nstart == 0) or
1584 (stop == nstart))
1585
1586#
1587#
1588#
1589
Benjamin Petersone711caf2008-06-11 16:44:04 +00001590class _Foo(Structure):
1591 _fields_ = [
1592 ('x', c_int),
1593 ('y', c_double)
1594 ]
1595
Brian Curtinafa88b52010-10-07 01:12:19 +00001596@unittest.skipUnless(HAS_SHAREDCTYPES,
1597 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001598class _TestSharedCTypes(BaseTestCase):
1599
1600 ALLOWED_TYPES = ('processes',)
1601
1602 def _double(self, x, y, foo, arr, string):
1603 x.value *= 2
1604 y.value *= 2
1605 foo.x *= 2
1606 foo.y *= 2
1607 string.value *= 2
1608 for i in range(len(arr)):
1609 arr[i] *= 2
1610
1611 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001612 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001613 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001614 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001615 arr = self.Array('d', list(range(10)), lock=lock)
1616 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001617 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001618
1619 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1620 p.start()
1621 p.join()
1622
1623 self.assertEqual(x.value, 14)
1624 self.assertAlmostEqual(y.value, 2.0/3.0)
1625 self.assertEqual(foo.x, 6)
1626 self.assertAlmostEqual(foo.y, 4.0)
1627 for i in range(10):
1628 self.assertAlmostEqual(arr[i], i*2)
1629 self.assertEqual(string.value, latin('hellohello'))
1630
1631 def test_synchronize(self):
1632 self.test_sharedctypes(lock=True)
1633
1634 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001635 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001636 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001637 foo.x = 0
1638 foo.y = 0
1639 self.assertEqual(bar.x, 2)
1640 self.assertAlmostEqual(bar.y, 5.0)
1641
1642#
1643#
1644#
1645
1646class _TestFinalize(BaseTestCase):
1647
1648 ALLOWED_TYPES = ('processes',)
1649
1650 def _test_finalize(self, conn):
1651 class Foo(object):
1652 pass
1653
1654 a = Foo()
1655 util.Finalize(a, conn.send, args=('a',))
1656 del a # triggers callback for a
1657
1658 b = Foo()
1659 close_b = util.Finalize(b, conn.send, args=('b',))
1660 close_b() # triggers callback for b
1661 close_b() # does nothing because callback has already been called
1662 del b # does nothing because callback has already been called
1663
1664 c = Foo()
1665 util.Finalize(c, conn.send, args=('c',))
1666
1667 d10 = Foo()
1668 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1669
1670 d01 = Foo()
1671 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1672 d02 = Foo()
1673 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1674 d03 = Foo()
1675 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1676
1677 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1678
1679 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1680
1681 # call mutliprocessing's cleanup function then exit process without
1682 # garbage collecting locals
1683 util._exit_function()
1684 conn.close()
1685 os._exit(0)
1686
1687 def test_finalize(self):
1688 conn, child_conn = self.Pipe()
1689
1690 p = self.Process(target=self._test_finalize, args=(child_conn,))
1691 p.start()
1692 p.join()
1693
1694 result = [obj for obj in iter(conn.recv, 'STOP')]
1695 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1696
1697#
1698# Test that from ... import * works for each module
1699#
1700
1701class _TestImportStar(BaseTestCase):
1702
1703 ALLOWED_TYPES = ('processes',)
1704
1705 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001706 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001707 'multiprocessing', 'multiprocessing.connection',
1708 'multiprocessing.heap', 'multiprocessing.managers',
1709 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001710 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001711 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001712 ]
1713
1714 if c_int is not None:
1715 # This module requires _ctypes
1716 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001717
1718 for name in modules:
1719 __import__(name)
1720 mod = sys.modules[name]
1721
1722 for attr in getattr(mod, '__all__', ()):
1723 self.assertTrue(
1724 hasattr(mod, attr),
1725 '%r does not have attribute %r' % (mod, attr)
1726 )
1727
1728#
1729# Quick test that logging works -- does not test logging output
1730#
1731
1732class _TestLogging(BaseTestCase):
1733
1734 ALLOWED_TYPES = ('processes',)
1735
1736 def test_enable_logging(self):
1737 logger = multiprocessing.get_logger()
1738 logger.setLevel(util.SUBWARNING)
1739 self.assertTrue(logger is not None)
1740 logger.debug('this will not be printed')
1741 logger.info('nor will this')
1742 logger.setLevel(LOG_LEVEL)
1743
1744 def _test_level(self, conn):
1745 logger = multiprocessing.get_logger()
1746 conn.send(logger.getEffectiveLevel())
1747
1748 def test_level(self):
1749 LEVEL1 = 32
1750 LEVEL2 = 37
1751
1752 logger = multiprocessing.get_logger()
1753 root_logger = logging.getLogger()
1754 root_level = root_logger.level
1755
1756 reader, writer = multiprocessing.Pipe(duplex=False)
1757
1758 logger.setLevel(LEVEL1)
1759 self.Process(target=self._test_level, args=(writer,)).start()
1760 self.assertEqual(LEVEL1, reader.recv())
1761
1762 logger.setLevel(logging.NOTSET)
1763 root_logger.setLevel(LEVEL2)
1764 self.Process(target=self._test_level, args=(writer,)).start()
1765 self.assertEqual(LEVEL2, reader.recv())
1766
1767 root_logger.setLevel(root_level)
1768 logger.setLevel(level=LOG_LEVEL)
1769
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001770
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001771# class _TestLoggingProcessName(BaseTestCase):
1772#
1773# def handle(self, record):
1774# assert record.processName == multiprocessing.current_process().name
1775# self.__handled = True
1776#
1777# def test_logging(self):
1778# handler = logging.Handler()
1779# handler.handle = self.handle
1780# self.__handled = False
1781# # Bypass getLogger() and side-effects
1782# logger = logging.getLoggerClass()(
1783# 'multiprocessing.test.TestLoggingProcessName')
1784# logger.addHandler(handler)
1785# logger.propagate = False
1786#
1787# logger.warn('foo')
1788# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001789
Benjamin Petersone711caf2008-06-11 16:44:04 +00001790#
Jesse Noller6214edd2009-01-19 16:23:53 +00001791# Test to verify handle verification, see issue 3321
1792#
1793
1794class TestInvalidHandle(unittest.TestCase):
1795
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001796 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001797 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001798 conn = _multiprocessing.Connection(44977608)
1799 self.assertRaises(IOError, conn.poll)
1800 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001801
Jesse Noller6214edd2009-01-19 16:23:53 +00001802#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001803# Functions used to create test cases from the base ones in this module
1804#
1805
1806def get_attributes(Source, names):
1807 d = {}
1808 for name in names:
1809 obj = getattr(Source, name)
1810 if type(obj) == type(get_attributes):
1811 obj = staticmethod(obj)
1812 d[name] = obj
1813 return d
1814
1815def create_test_cases(Mixin, type):
1816 result = {}
1817 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001818 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001819
1820 for name in list(glob.keys()):
1821 if name.startswith('_Test'):
1822 base = glob[name]
1823 if type in base.ALLOWED_TYPES:
1824 newname = 'With' + Type + name[1:]
1825 class Temp(base, unittest.TestCase, Mixin):
1826 pass
1827 result[newname] = Temp
1828 Temp.__name__ = newname
1829 Temp.__module__ = Mixin.__module__
1830 return result
1831
1832#
1833# Create test cases
1834#
1835
1836class ProcessesMixin(object):
1837 TYPE = 'processes'
1838 Process = multiprocessing.Process
1839 locals().update(get_attributes(multiprocessing, (
1840 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1841 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1842 'RawArray', 'current_process', 'active_children', 'Pipe',
1843 'connection', 'JoinableQueue'
1844 )))
1845
1846testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1847globals().update(testcases_processes)
1848
1849
1850class ManagerMixin(object):
1851 TYPE = 'manager'
1852 Process = multiprocessing.Process
1853 manager = object.__new__(multiprocessing.managers.SyncManager)
1854 locals().update(get_attributes(manager, (
1855 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1856 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1857 'Namespace', 'JoinableQueue'
1858 )))
1859
1860testcases_manager = create_test_cases(ManagerMixin, type='manager')
1861globals().update(testcases_manager)
1862
1863
1864class ThreadsMixin(object):
1865 TYPE = 'threads'
1866 Process = multiprocessing.dummy.Process
1867 locals().update(get_attributes(multiprocessing.dummy, (
1868 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1869 'Condition', 'Event', 'Value', 'Array', 'current_process',
1870 'active_children', 'Pipe', 'connection', 'dict', 'list',
1871 'Namespace', 'JoinableQueue'
1872 )))
1873
1874testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1875globals().update(testcases_threads)
1876
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001877class OtherTest(unittest.TestCase):
1878 # TODO: add more tests for deliver/answer challenge.
1879 def test_deliver_challenge_auth_failure(self):
1880 class _FakeConnection(object):
1881 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001882 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001883 def send_bytes(self, data):
1884 pass
1885 self.assertRaises(multiprocessing.AuthenticationError,
1886 multiprocessing.connection.deliver_challenge,
1887 _FakeConnection(), b'abc')
1888
1889 def test_answer_challenge_auth_failure(self):
1890 class _FakeConnection(object):
1891 def __init__(self):
1892 self.count = 0
1893 def recv_bytes(self, size):
1894 self.count += 1
1895 if self.count == 1:
1896 return multiprocessing.connection.CHALLENGE
1897 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001898 return b'something bogus'
1899 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001900 def send_bytes(self, data):
1901 pass
1902 self.assertRaises(multiprocessing.AuthenticationError,
1903 multiprocessing.connection.answer_challenge,
1904 _FakeConnection(), b'abc')
1905
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001906#
1907# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1908#
1909
1910def initializer(ns):
1911 ns.test += 1
1912
1913class TestInitializers(unittest.TestCase):
1914 def setUp(self):
1915 self.mgr = multiprocessing.Manager()
1916 self.ns = self.mgr.Namespace()
1917 self.ns.test = 0
1918
1919 def tearDown(self):
1920 self.mgr.shutdown()
1921
1922 def test_manager_initializer(self):
1923 m = multiprocessing.managers.SyncManager()
1924 self.assertRaises(TypeError, m.start, 1)
1925 m.start(initializer, (self.ns,))
1926 self.assertEqual(self.ns.test, 1)
1927 m.shutdown()
1928
1929 def test_pool_initializer(self):
1930 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1931 p = multiprocessing.Pool(1, initializer, (self.ns,))
1932 p.close()
1933 p.join()
1934 self.assertEqual(self.ns.test, 1)
1935
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00001936#
1937# Issue 5155, 5313, 5331: Test process in processes
1938# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1939#
1940
1941def _ThisSubProcess(q):
1942 try:
1943 item = q.get(block=False)
1944 except pyqueue.Empty:
1945 pass
1946
1947def _TestProcess(q):
1948 queue = multiprocessing.Queue()
1949 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1950 subProc.start()
1951 subProc.join()
1952
1953def _afunc(x):
1954 return x*x
1955
1956def pool_in_process():
1957 pool = multiprocessing.Pool(processes=4)
1958 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1959
1960class _file_like(object):
1961 def __init__(self, delegate):
1962 self._delegate = delegate
1963 self._pid = None
1964
1965 @property
1966 def cache(self):
1967 pid = os.getpid()
1968 # There are no race conditions since fork keeps only the running thread
1969 if pid != self._pid:
1970 self._pid = pid
1971 self._cache = []
1972 return self._cache
1973
1974 def write(self, data):
1975 self.cache.append(data)
1976
1977 def flush(self):
1978 self._delegate.write(''.join(self.cache))
1979 self._cache = []
1980
1981class TestStdinBadfiledescriptor(unittest.TestCase):
1982
1983 def test_queue_in_process(self):
1984 queue = multiprocessing.Queue()
1985 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1986 proc.start()
1987 proc.join()
1988
1989 def test_pool_in_process(self):
1990 p = multiprocessing.Process(target=pool_in_process)
1991 p.start()
1992 p.join()
1993
1994 def test_flushing(self):
1995 sio = io.StringIO()
1996 flike = _file_like(sio)
1997 flike.write('foo')
1998 proc = multiprocessing.Process(target=lambda: flike.flush())
1999 flike.flush()
2000 assert sio.getvalue() == 'foo'
2001
2002testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2003 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002004
Benjamin Petersone711caf2008-06-11 16:44:04 +00002005#
2006#
2007#
2008
2009def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002010 if sys.platform.startswith("linux"):
2011 try:
2012 lock = multiprocessing.RLock()
2013 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002014 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002015
Benjamin Petersone711caf2008-06-11 16:44:04 +00002016 if run is None:
2017 from test.support import run_unittest as run
2018
2019 util.get_temp_dir() # creates temp directory for use by all processes
2020
2021 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2022
Benjamin Peterson41181742008-07-02 20:22:54 +00002023 ProcessesMixin.pool = multiprocessing.Pool(4)
2024 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2025 ManagerMixin.manager.__init__()
2026 ManagerMixin.manager.start()
2027 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002028
2029 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002030 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2031 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002032 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2033 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002034 )
2035
2036 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2037 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2038 run(suite)
2039
Benjamin Peterson41181742008-07-02 20:22:54 +00002040 ThreadsMixin.pool.terminate()
2041 ProcessesMixin.pool.terminate()
2042 ManagerMixin.pool.terminate()
2043 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002044
Benjamin Peterson41181742008-07-02 20:22:54 +00002045 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002046
2047def main():
2048 test_main(unittest.TextTestRunner(verbosity=2).run)
2049
2050if __name__ == '__main__':
2051 main()