blob: 71571d760ff5737322c62f0bff122f2f91d1a488 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000019import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
R. David Murraya21e4ca2009-03-31 23:16:50 +000022# Skip tests if _multiprocessing wasn't built.
23_multiprocessing = test.support.import_module('_multiprocessing')
24# Skip tests if sem_open implementation is broken.
25test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000026# import threading after _multiprocessing to raise a more revelant error
27# message: "No module named _multiprocessing". _multiprocessing is not compiled
28# without thread support.
29import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
Benjamin Petersone711caf2008-06-11 16:44:04 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37from multiprocessing import util
38
Brian Curtinafa88b52010-10-07 01:12:19 +000039try:
40 from multiprocessing.sharedctypes import Value, copy
41 HAS_SHAREDCTYPES = True
42except ImportError:
43 HAS_SHAREDCTYPES = False
44
Benjamin Petersone711caf2008-06-11 16:44:04 +000045#
46#
47#
48
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000049def latin(s):
50 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Benjamin Petersone711caf2008-06-11 16:44:04 +000052#
53# Constants
54#
55
56LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000057#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000058
59DELTA = 0.1
60CHECK_TIMINGS = False # making true makes tests take a lot longer
61 # and can sometimes cause some non-serious
62 # failures because some calls block a bit
63 # longer than expected
64if CHECK_TIMINGS:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
66else:
67 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
68
69HAVE_GETVALUE = not getattr(_multiprocessing,
70 'HAVE_BROKEN_SEM_GETVALUE', False)
71
Jesse Noller6214edd2009-01-19 16:23:53 +000072WIN32 = (sys.platform == "win32")
73
Benjamin Petersone711caf2008-06-11 16:44:04 +000074#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000075# Some tests require ctypes
76#
77
78try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000079 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000080except ImportError:
81 Structure = object
82 c_int = c_double = None
83
84#
Benjamin Petersone711caf2008-06-11 16:44:04 +000085# Creates a wrapper for a function which records the time it takes to finish
86#
87
88class TimingWrapper(object):
89
90 def __init__(self, func):
91 self.func = func
92 self.elapsed = None
93
94 def __call__(self, *args, **kwds):
95 t = time.time()
96 try:
97 return self.func(*args, **kwds)
98 finally:
99 self.elapsed = time.time() - t
100
101#
102# Base class for test cases
103#
104
105class BaseTestCase(object):
106
107 ALLOWED_TYPES = ('processes', 'manager', 'threads')
108
109 def assertTimingAlmostEqual(self, a, b):
110 if CHECK_TIMINGS:
111 self.assertAlmostEqual(a, b, 1)
112
113 def assertReturnsIfImplemented(self, value, func, *args):
114 try:
115 res = func(*args)
116 except NotImplementedError:
117 pass
118 else:
119 return self.assertEqual(value, res)
120
121#
122# Return the value of a semaphore
123#
124
125def get_value(self):
126 try:
127 return self.get_value()
128 except AttributeError:
129 try:
130 return self._Semaphore__value
131 except AttributeError:
132 try:
133 return self._value
134 except AttributeError:
135 raise NotImplementedError
136
137#
138# Testcases
139#
140
141class _TestProcess(BaseTestCase):
142
143 ALLOWED_TYPES = ('processes', 'threads')
144
145 def test_current(self):
146 if self.TYPE == 'threads':
147 return
148
149 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000150 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151
152 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000153 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000154 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000156 self.assertEqual(current.ident, os.getpid())
157 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158
159 def _test(self, q, *args, **kwds):
160 current = self.current_process()
161 q.put(args)
162 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000164 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000165 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 q.put(current.pid)
167
168 def test_process(self):
169 q = self.Queue(1)
170 e = self.Event()
171 args = (q, 1, 2)
172 kwargs = {'hello':23, 'bye':2.54}
173 name = 'SomeProcess'
174 p = self.Process(
175 target=self._test, args=args, kwargs=kwargs, name=name
176 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000177 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178 current = self.current_process()
179
180 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000183 self.assertEquals(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000184 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000185 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187
188 p.start()
189
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191 self.assertEquals(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
194 self.assertEquals(q.get(), args[1:])
195 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199 self.assertEquals(q.get(), p.pid)
200
201 p.join()
202
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self.assertEquals(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000205 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
207 def _test_terminate(self):
208 time.sleep(1000)
209
210 def test_terminate(self):
211 if self.TYPE == 'threads':
212 return
213
214 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 p.start()
217
218 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000219 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000220 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221
222 p.terminate()
223
224 join = TimingWrapper(p.join)
225 self.assertEqual(join(), None)
226 self.assertTimingAlmostEqual(join.elapsed, 0.0)
227
228 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000229 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230
231 p.join()
232
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000233 # XXX sometimes get p.exitcode == 0 on Windows ...
234 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235
236 def test_cpu_count(self):
237 try:
238 cpus = multiprocessing.cpu_count()
239 except NotImplementedError:
240 cpus = 1
241 self.assertTrue(type(cpus) is int)
242 self.assertTrue(cpus >= 1)
243
244 def test_active_children(self):
245 self.assertEqual(type(self.active_children()), list)
246
247 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
250 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000251 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
253 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000254 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255
256 def _test_recursion(self, wconn, id):
257 from multiprocessing import forking
258 wconn.send(id)
259 if len(id) < 2:
260 for i in range(2):
261 p = self.Process(
262 target=self._test_recursion, args=(wconn, id+[i])
263 )
264 p.start()
265 p.join()
266
267 def test_recursion(self):
268 rconn, wconn = self.Pipe(duplex=False)
269 self._test_recursion(wconn, [])
270
271 time.sleep(DELTA)
272 result = []
273 while rconn.poll():
274 result.append(rconn.recv())
275
276 expected = [
277 [],
278 [0],
279 [0, 0],
280 [0, 1],
281 [1],
282 [1, 0],
283 [1, 1]
284 ]
285 self.assertEqual(result, expected)
286
287#
288#
289#
290
291class _UpperCaser(multiprocessing.Process):
292
293 def __init__(self):
294 multiprocessing.Process.__init__(self)
295 self.child_conn, self.parent_conn = multiprocessing.Pipe()
296
297 def run(self):
298 self.parent_conn.close()
299 for s in iter(self.child_conn.recv, None):
300 self.child_conn.send(s.upper())
301 self.child_conn.close()
302
303 def submit(self, s):
304 assert type(s) is str
305 self.parent_conn.send(s)
306 return self.parent_conn.recv()
307
308 def stop(self):
309 self.parent_conn.send(None)
310 self.parent_conn.close()
311 self.child_conn.close()
312
313class _TestSubclassingProcess(BaseTestCase):
314
315 ALLOWED_TYPES = ('processes',)
316
317 def test_subclassing(self):
318 uppercaser = _UpperCaser()
319 uppercaser.start()
320 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
321 self.assertEqual(uppercaser.submit('world'), 'WORLD')
322 uppercaser.stop()
323 uppercaser.join()
324
325#
326#
327#
328
329def queue_empty(q):
330 if hasattr(q, 'empty'):
331 return q.empty()
332 else:
333 return q.qsize() == 0
334
335def queue_full(q, maxsize):
336 if hasattr(q, 'full'):
337 return q.full()
338 else:
339 return q.qsize() == maxsize
340
341
342class _TestQueue(BaseTestCase):
343
344
345 def _test_put(self, queue, child_can_start, parent_can_continue):
346 child_can_start.wait()
347 for i in range(6):
348 queue.get()
349 parent_can_continue.set()
350
351 def test_put(self):
352 MAXSIZE = 6
353 queue = self.Queue(maxsize=MAXSIZE)
354 child_can_start = self.Event()
355 parent_can_continue = self.Event()
356
357 proc = self.Process(
358 target=self._test_put,
359 args=(queue, child_can_start, parent_can_continue)
360 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000361 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362 proc.start()
363
364 self.assertEqual(queue_empty(queue), True)
365 self.assertEqual(queue_full(queue, MAXSIZE), False)
366
367 queue.put(1)
368 queue.put(2, True)
369 queue.put(3, True, None)
370 queue.put(4, False)
371 queue.put(5, False, None)
372 queue.put_nowait(6)
373
374 # the values may be in buffer but not yet in pipe so sleep a bit
375 time.sleep(DELTA)
376
377 self.assertEqual(queue_empty(queue), False)
378 self.assertEqual(queue_full(queue, MAXSIZE), True)
379
380 put = TimingWrapper(queue.put)
381 put_nowait = TimingWrapper(queue.put_nowait)
382
383 self.assertRaises(pyqueue.Full, put, 7, False)
384 self.assertTimingAlmostEqual(put.elapsed, 0)
385
386 self.assertRaises(pyqueue.Full, put, 7, False, None)
387 self.assertTimingAlmostEqual(put.elapsed, 0)
388
389 self.assertRaises(pyqueue.Full, put_nowait, 7)
390 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
391
392 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
393 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
394
395 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
399 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
400
401 child_can_start.set()
402 parent_can_continue.wait()
403
404 self.assertEqual(queue_empty(queue), True)
405 self.assertEqual(queue_full(queue, MAXSIZE), False)
406
407 proc.join()
408
409 def _test_get(self, queue, child_can_start, parent_can_continue):
410 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000411 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000412 queue.put(2)
413 queue.put(3)
414 queue.put(4)
415 queue.put(5)
416 parent_can_continue.set()
417
418 def test_get(self):
419 queue = self.Queue()
420 child_can_start = self.Event()
421 parent_can_continue = self.Event()
422
423 proc = self.Process(
424 target=self._test_get,
425 args=(queue, child_can_start, parent_can_continue)
426 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000427 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000428 proc.start()
429
430 self.assertEqual(queue_empty(queue), True)
431
432 child_can_start.set()
433 parent_can_continue.wait()
434
435 time.sleep(DELTA)
436 self.assertEqual(queue_empty(queue), False)
437
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000438 # Hangs unexpectedly, remove for now
439 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000440 self.assertEqual(queue.get(True, None), 2)
441 self.assertEqual(queue.get(True), 3)
442 self.assertEqual(queue.get(timeout=1), 4)
443 self.assertEqual(queue.get_nowait(), 5)
444
445 self.assertEqual(queue_empty(queue), True)
446
447 get = TimingWrapper(queue.get)
448 get_nowait = TimingWrapper(queue.get_nowait)
449
450 self.assertRaises(pyqueue.Empty, get, False)
451 self.assertTimingAlmostEqual(get.elapsed, 0)
452
453 self.assertRaises(pyqueue.Empty, get, False, None)
454 self.assertTimingAlmostEqual(get.elapsed, 0)
455
456 self.assertRaises(pyqueue.Empty, get_nowait)
457 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
458
459 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
460 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
461
462 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
466 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
467
468 proc.join()
469
470 def _test_fork(self, queue):
471 for i in range(10, 20):
472 queue.put(i)
473 # note that at this point the items may only be buffered, so the
474 # process cannot shutdown until the feeder thread has finished
475 # pushing items onto the pipe.
476
477 def test_fork(self):
478 # Old versions of Queue would fail to create a new feeder
479 # thread for a forked process if the original process had its
480 # own feeder thread. This test checks that this no longer
481 # happens.
482
483 queue = self.Queue()
484
485 # put items on queue so that main process starts a feeder thread
486 for i in range(10):
487 queue.put(i)
488
489 # wait to make sure thread starts before we fork a new process
490 time.sleep(DELTA)
491
492 # fork process
493 p = self.Process(target=self._test_fork, args=(queue,))
494 p.start()
495
496 # check that all expected items are in the queue
497 for i in range(20):
498 self.assertEqual(queue.get(), i)
499 self.assertRaises(pyqueue.Empty, queue.get, False)
500
501 p.join()
502
503 def test_qsize(self):
504 q = self.Queue()
505 try:
506 self.assertEqual(q.qsize(), 0)
507 except NotImplementedError:
508 return
509 q.put(1)
510 self.assertEqual(q.qsize(), 1)
511 q.put(5)
512 self.assertEqual(q.qsize(), 2)
513 q.get()
514 self.assertEqual(q.qsize(), 1)
515 q.get()
516 self.assertEqual(q.qsize(), 0)
517
518 def _test_task_done(self, q):
519 for obj in iter(q.get, None):
520 time.sleep(DELTA)
521 q.task_done()
522
523 def test_task_done(self):
524 queue = self.JoinableQueue()
525
526 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000527 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000528
529 workers = [self.Process(target=self._test_task_done, args=(queue,))
530 for i in range(4)]
531
532 for p in workers:
533 p.start()
534
535 for i in range(10):
536 queue.put(i)
537
538 queue.join()
539
540 for p in workers:
541 queue.put(None)
542
543 for p in workers:
544 p.join()
545
546#
547#
548#
549
550class _TestLock(BaseTestCase):
551
552 def test_lock(self):
553 lock = self.Lock()
554 self.assertEqual(lock.acquire(), True)
555 self.assertEqual(lock.acquire(False), False)
556 self.assertEqual(lock.release(), None)
557 self.assertRaises((ValueError, threading.ThreadError), lock.release)
558
559 def test_rlock(self):
560 lock = self.RLock()
561 self.assertEqual(lock.acquire(), True)
562 self.assertEqual(lock.acquire(), True)
563 self.assertEqual(lock.acquire(), True)
564 self.assertEqual(lock.release(), None)
565 self.assertEqual(lock.release(), None)
566 self.assertEqual(lock.release(), None)
567 self.assertRaises((AssertionError, RuntimeError), lock.release)
568
Jesse Nollerf8d00852009-03-31 03:25:07 +0000569 def test_lock_context(self):
570 with self.Lock():
571 pass
572
Benjamin Petersone711caf2008-06-11 16:44:04 +0000573
574class _TestSemaphore(BaseTestCase):
575
576 def _test_semaphore(self, sem):
577 self.assertReturnsIfImplemented(2, get_value, sem)
578 self.assertEqual(sem.acquire(), True)
579 self.assertReturnsIfImplemented(1, get_value, sem)
580 self.assertEqual(sem.acquire(), True)
581 self.assertReturnsIfImplemented(0, get_value, sem)
582 self.assertEqual(sem.acquire(False), False)
583 self.assertReturnsIfImplemented(0, get_value, sem)
584 self.assertEqual(sem.release(), None)
585 self.assertReturnsIfImplemented(1, get_value, sem)
586 self.assertEqual(sem.release(), None)
587 self.assertReturnsIfImplemented(2, get_value, sem)
588
589 def test_semaphore(self):
590 sem = self.Semaphore(2)
591 self._test_semaphore(sem)
592 self.assertEqual(sem.release(), None)
593 self.assertReturnsIfImplemented(3, get_value, sem)
594 self.assertEqual(sem.release(), None)
595 self.assertReturnsIfImplemented(4, get_value, sem)
596
597 def test_bounded_semaphore(self):
598 sem = self.BoundedSemaphore(2)
599 self._test_semaphore(sem)
600 # Currently fails on OS/X
601 #if HAVE_GETVALUE:
602 # self.assertRaises(ValueError, sem.release)
603 # self.assertReturnsIfImplemented(2, get_value, sem)
604
605 def test_timeout(self):
606 if self.TYPE != 'processes':
607 return
608
609 sem = self.Semaphore(0)
610 acquire = TimingWrapper(sem.acquire)
611
612 self.assertEqual(acquire(False), False)
613 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
614
615 self.assertEqual(acquire(False, None), False)
616 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
617
618 self.assertEqual(acquire(False, TIMEOUT1), False)
619 self.assertTimingAlmostEqual(acquire.elapsed, 0)
620
621 self.assertEqual(acquire(True, TIMEOUT2), False)
622 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
623
624 self.assertEqual(acquire(timeout=TIMEOUT3), False)
625 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
626
627
628class _TestCondition(BaseTestCase):
629
630 def f(self, cond, sleeping, woken, timeout=None):
631 cond.acquire()
632 sleeping.release()
633 cond.wait(timeout)
634 woken.release()
635 cond.release()
636
637 def check_invariant(self, cond):
638 # this is only supposed to succeed when there are no sleepers
639 if self.TYPE == 'processes':
640 try:
641 sleepers = (cond._sleeping_count.get_value() -
642 cond._woken_count.get_value())
643 self.assertEqual(sleepers, 0)
644 self.assertEqual(cond._wait_semaphore.get_value(), 0)
645 except NotImplementedError:
646 pass
647
648 def test_notify(self):
649 cond = self.Condition()
650 sleeping = self.Semaphore(0)
651 woken = self.Semaphore(0)
652
653 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000654 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000655 p.start()
656
657 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000658 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000659 p.start()
660
661 # wait for both children to start sleeping
662 sleeping.acquire()
663 sleeping.acquire()
664
665 # check no process/thread has woken up
666 time.sleep(DELTA)
667 self.assertReturnsIfImplemented(0, get_value, woken)
668
669 # wake up one process/thread
670 cond.acquire()
671 cond.notify()
672 cond.release()
673
674 # check one process/thread has woken up
675 time.sleep(DELTA)
676 self.assertReturnsIfImplemented(1, get_value, woken)
677
678 # wake up another
679 cond.acquire()
680 cond.notify()
681 cond.release()
682
683 # check other has woken up
684 time.sleep(DELTA)
685 self.assertReturnsIfImplemented(2, get_value, woken)
686
687 # check state is not mucked up
688 self.check_invariant(cond)
689 p.join()
690
691 def test_notify_all(self):
692 cond = self.Condition()
693 sleeping = self.Semaphore(0)
694 woken = self.Semaphore(0)
695
696 # start some threads/processes which will timeout
697 for i in range(3):
698 p = self.Process(target=self.f,
699 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000700 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701 p.start()
702
703 t = threading.Thread(target=self.f,
704 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000705 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 t.start()
707
708 # wait for them all to sleep
709 for i in range(6):
710 sleeping.acquire()
711
712 # check they have all timed out
713 for i in range(6):
714 woken.acquire()
715 self.assertReturnsIfImplemented(0, get_value, woken)
716
717 # check state is not mucked up
718 self.check_invariant(cond)
719
720 # start some more threads/processes
721 for i in range(3):
722 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000723 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000724 p.start()
725
726 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000727 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000728 t.start()
729
730 # wait for them to all sleep
731 for i in range(6):
732 sleeping.acquire()
733
734 # check no process/thread has woken up
735 time.sleep(DELTA)
736 self.assertReturnsIfImplemented(0, get_value, woken)
737
738 # wake them all up
739 cond.acquire()
740 cond.notify_all()
741 cond.release()
742
743 # check they have all woken
744 time.sleep(DELTA)
745 self.assertReturnsIfImplemented(6, get_value, woken)
746
747 # check state is not mucked up
748 self.check_invariant(cond)
749
750 def test_timeout(self):
751 cond = self.Condition()
752 wait = TimingWrapper(cond.wait)
753 cond.acquire()
754 res = wait(TIMEOUT1)
755 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000756 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000757 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
758
759
760class _TestEvent(BaseTestCase):
761
762 def _test_event(self, event):
763 time.sleep(TIMEOUT2)
764 event.set()
765
766 def test_event(self):
767 event = self.Event()
768 wait = TimingWrapper(event.wait)
769
770 # Removed temporaily, due to API shear, this does not
771 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000772 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000773
Benjamin Peterson965ce872009-04-05 21:24:58 +0000774 # Removed, threading.Event.wait() will return the value of the __flag
775 # instead of None. API Shear with the semaphore backed mp.Event
776 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000778 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
780
781 event.set()
782
783 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000784 self.assertEqual(event.is_set(), True)
785 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000787 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
789 # self.assertEqual(event.is_set(), True)
790
791 event.clear()
792
793 #self.assertEqual(event.is_set(), False)
794
795 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000796 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000797
798#
799#
800#
801
Brian Curtinafa88b52010-10-07 01:12:19 +0000802@unittest.skipUnless(HAS_SHAREDCTYPES,
803 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804class _TestValue(BaseTestCase):
805
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000806 ALLOWED_TYPES = ('processes',)
807
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808 codes_values = [
809 ('i', 4343, 24234),
810 ('d', 3.625, -4.25),
811 ('h', -232, 234),
812 ('c', latin('x'), latin('y'))
813 ]
814
815 def _test(self, values):
816 for sv, cv in zip(values, self.codes_values):
817 sv.value = cv[2]
818
819
820 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000821 if raw:
822 values = [self.RawValue(code, value)
823 for code, value, _ in self.codes_values]
824 else:
825 values = [self.Value(code, value)
826 for code, value, _ in self.codes_values]
827
828 for sv, cv in zip(values, self.codes_values):
829 self.assertEqual(sv.value, cv[1])
830
831 proc = self.Process(target=self._test, args=(values,))
832 proc.start()
833 proc.join()
834
835 for sv, cv in zip(values, self.codes_values):
836 self.assertEqual(sv.value, cv[2])
837
838 def test_rawvalue(self):
839 self.test_value(raw=True)
840
841 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000842 val1 = self.Value('i', 5)
843 lock1 = val1.get_lock()
844 obj1 = val1.get_obj()
845
846 val2 = self.Value('i', 5, lock=None)
847 lock2 = val2.get_lock()
848 obj2 = val2.get_obj()
849
850 lock = self.Lock()
851 val3 = self.Value('i', 5, lock=lock)
852 lock3 = val3.get_lock()
853 obj3 = val3.get_obj()
854 self.assertEqual(lock, lock3)
855
Jesse Nollerb0516a62009-01-18 03:11:38 +0000856 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000857 self.assertFalse(hasattr(arr4, 'get_lock'))
858 self.assertFalse(hasattr(arr4, 'get_obj'))
859
Jesse Nollerb0516a62009-01-18 03:11:38 +0000860 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
861
862 arr5 = self.RawValue('i', 5)
863 self.assertFalse(hasattr(arr5, 'get_lock'))
864 self.assertFalse(hasattr(arr5, 'get_obj'))
865
Benjamin Petersone711caf2008-06-11 16:44:04 +0000866
867class _TestArray(BaseTestCase):
868
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000869 ALLOWED_TYPES = ('processes',)
870
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 def f(self, seq):
872 for i in range(1, len(seq)):
873 seq[i] += seq[i-1]
874
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000875 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000877 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
878 if raw:
879 arr = self.RawArray('i', seq)
880 else:
881 arr = self.Array('i', seq)
882
883 self.assertEqual(len(arr), len(seq))
884 self.assertEqual(arr[3], seq[3])
885 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
886
887 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
888
889 self.assertEqual(list(arr[:]), seq)
890
891 self.f(seq)
892
893 p = self.Process(target=self.f, args=(arr,))
894 p.start()
895 p.join()
896
897 self.assertEqual(list(arr[:]), seq)
898
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000899 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 def test_rawarray(self):
901 self.test_array(raw=True)
902
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000903 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905 arr1 = self.Array('i', list(range(10)))
906 lock1 = arr1.get_lock()
907 obj1 = arr1.get_obj()
908
909 arr2 = self.Array('i', list(range(10)), lock=None)
910 lock2 = arr2.get_lock()
911 obj2 = arr2.get_obj()
912
913 lock = self.Lock()
914 arr3 = self.Array('i', list(range(10)), lock=lock)
915 lock3 = arr3.get_lock()
916 obj3 = arr3.get_obj()
917 self.assertEqual(lock, lock3)
918
Jesse Nollerb0516a62009-01-18 03:11:38 +0000919 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertFalse(hasattr(arr4, 'get_lock'))
921 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000922 self.assertRaises(AttributeError,
923 self.Array, 'i', range(10), lock='notalock')
924
925 arr5 = self.RawArray('i', range(10))
926 self.assertFalse(hasattr(arr5, 'get_lock'))
927 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000928
929#
930#
931#
932
933class _TestContainers(BaseTestCase):
934
935 ALLOWED_TYPES = ('manager',)
936
937 def test_list(self):
938 a = self.list(list(range(10)))
939 self.assertEqual(a[:], list(range(10)))
940
941 b = self.list()
942 self.assertEqual(b[:], [])
943
944 b.extend(list(range(5)))
945 self.assertEqual(b[:], list(range(5)))
946
947 self.assertEqual(b[2], 2)
948 self.assertEqual(b[2:10], [2,3,4])
949
950 b *= 2
951 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
952
953 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
954
955 self.assertEqual(a[:], list(range(10)))
956
957 d = [a, b]
958 e = self.list(d)
959 self.assertEqual(
960 e[:],
961 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
962 )
963
964 f = self.list([a])
965 a.append('hello')
966 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
967
968 def test_dict(self):
969 d = self.dict()
970 indices = list(range(65, 70))
971 for i in indices:
972 d[i] = chr(i)
973 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
974 self.assertEqual(sorted(d.keys()), indices)
975 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
976 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
977
978 def test_namespace(self):
979 n = self.Namespace()
980 n.name = 'Bob'
981 n.job = 'Builder'
982 n._hidden = 'hidden'
983 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
984 del n.job
985 self.assertEqual(str(n), "Namespace(name='Bob')")
986 self.assertTrue(hasattr(n, 'name'))
987 self.assertTrue(not hasattr(n, 'job'))
988
989#
990#
991#
992
993def sqr(x, wait=0.0):
994 time.sleep(wait)
995 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996class _TestPool(BaseTestCase):
997
998 def test_apply(self):
999 papply = self.pool.apply
1000 self.assertEqual(papply(sqr, (5,)), sqr(5))
1001 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1002
1003 def test_map(self):
1004 pmap = self.pool.map
1005 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1006 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1007 list(map(sqr, list(range(100)))))
1008
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001009 def test_map_chunksize(self):
1010 try:
1011 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1012 except multiprocessing.TimeoutError:
1013 self.fail("pool.map_async with chunksize stalled on null list")
1014
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 def test_async(self):
1016 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1017 get = TimingWrapper(res.get)
1018 self.assertEqual(get(), 49)
1019 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1020
1021 def test_async_timeout(self):
1022 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1023 get = TimingWrapper(res.get)
1024 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1025 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1026
1027 def test_imap(self):
1028 it = self.pool.imap(sqr, list(range(10)))
1029 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1030
1031 it = self.pool.imap(sqr, list(range(10)))
1032 for i in range(10):
1033 self.assertEqual(next(it), i*i)
1034 self.assertRaises(StopIteration, it.__next__)
1035
1036 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1037 for i in range(1000):
1038 self.assertEqual(next(it), i*i)
1039 self.assertRaises(StopIteration, it.__next__)
1040
1041 def test_imap_unordered(self):
1042 it = self.pool.imap_unordered(sqr, list(range(1000)))
1043 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1044
1045 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1046 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1047
1048 def test_make_pool(self):
1049 p = multiprocessing.Pool(3)
1050 self.assertEqual(3, len(p._pool))
1051 p.close()
1052 p.join()
1053
1054 def test_terminate(self):
1055 if self.TYPE == 'manager':
1056 # On Unix a forked process increfs each shared object to
1057 # which its parent process held a reference. If the
1058 # forked process gets terminated then there is likely to
1059 # be a reference leak. So to prevent
1060 # _TestZZZNumberOfObjects from failing we skip this test
1061 # when using a manager.
1062 return
1063
1064 result = self.pool.map_async(
1065 time.sleep, [0.1 for i in range(10000)], chunksize=1
1066 )
1067 self.pool.terminate()
1068 join = TimingWrapper(self.pool.join)
1069 join()
1070 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001071
1072class _TestPoolWorkerLifetime(BaseTestCase):
1073
1074 ALLOWED_TYPES = ('processes', )
1075 def test_pool_worker_lifetime(self):
1076 p = multiprocessing.Pool(3, maxtasksperchild=10)
1077 self.assertEqual(3, len(p._pool))
1078 origworkerpids = [w.pid for w in p._pool]
1079 # Run many tasks so each worker gets replaced (hopefully)
1080 results = []
1081 for i in range(100):
1082 results.append(p.apply_async(sqr, (i, )))
1083 # Fetch the results and verify we got the right answers,
1084 # also ensuring all the tasks have completed.
1085 for (j, res) in enumerate(results):
1086 self.assertEqual(res.get(), sqr(j))
1087 # Refill the pool
1088 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001089 # Wait until all workers are alive
1090 countdown = 5
1091 while countdown and not all(w.is_alive() for w in p._pool):
1092 countdown -= 1
1093 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001094 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001095 # All pids should be assigned. See issue #7805.
1096 self.assertNotIn(None, origworkerpids)
1097 self.assertNotIn(None, finalworkerpids)
1098 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001099 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1100 p.close()
1101 p.join()
1102
Benjamin Petersone711caf2008-06-11 16:44:04 +00001103#
1104# Test that manager has expected number of shared objects left
1105#
1106
1107class _TestZZZNumberOfObjects(BaseTestCase):
1108 # Because test cases are sorted alphabetically, this one will get
1109 # run after all the other tests for the manager. It tests that
1110 # there have been no "reference leaks" for the manager's shared
1111 # objects. Note the comment in _TestPool.test_terminate().
1112 ALLOWED_TYPES = ('manager',)
1113
1114 def test_number_of_objects(self):
1115 EXPECTED_NUMBER = 1 # the pool object is still alive
1116 multiprocessing.active_children() # discard dead process objs
1117 gc.collect() # do garbage collection
1118 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001119 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001120 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001121 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001122 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001123
1124 self.assertEqual(refs, EXPECTED_NUMBER)
1125
1126#
1127# Test of creating a customized manager class
1128#
1129
1130from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1131
1132class FooBar(object):
1133 def f(self):
1134 return 'f()'
1135 def g(self):
1136 raise ValueError
1137 def _h(self):
1138 return '_h()'
1139
1140def baz():
1141 for i in range(10):
1142 yield i*i
1143
1144class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001145 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001146 def __iter__(self):
1147 return self
1148 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001149 return self._callmethod('__next__')
1150
1151class MyManager(BaseManager):
1152 pass
1153
1154MyManager.register('Foo', callable=FooBar)
1155MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1156MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1157
1158
1159class _TestMyManager(BaseTestCase):
1160
1161 ALLOWED_TYPES = ('manager',)
1162
1163 def test_mymanager(self):
1164 manager = MyManager()
1165 manager.start()
1166
1167 foo = manager.Foo()
1168 bar = manager.Bar()
1169 baz = manager.baz()
1170
1171 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1172 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1173
1174 self.assertEqual(foo_methods, ['f', 'g'])
1175 self.assertEqual(bar_methods, ['f', '_h'])
1176
1177 self.assertEqual(foo.f(), 'f()')
1178 self.assertRaises(ValueError, foo.g)
1179 self.assertEqual(foo._callmethod('f'), 'f()')
1180 self.assertRaises(RemoteError, foo._callmethod, '_h')
1181
1182 self.assertEqual(bar.f(), 'f()')
1183 self.assertEqual(bar._h(), '_h()')
1184 self.assertEqual(bar._callmethod('f'), 'f()')
1185 self.assertEqual(bar._callmethod('_h'), '_h()')
1186
1187 self.assertEqual(list(baz), [i*i for i in range(10)])
1188
1189 manager.shutdown()
1190
1191#
1192# Test of connecting to a remote server and using xmlrpclib for serialization
1193#
1194
1195_queue = pyqueue.Queue()
1196def get_queue():
1197 return _queue
1198
1199class QueueManager(BaseManager):
1200 '''manager class used by server process'''
1201QueueManager.register('get_queue', callable=get_queue)
1202
1203class QueueManager2(BaseManager):
1204 '''manager class which specifies the same interface as QueueManager'''
1205QueueManager2.register('get_queue')
1206
1207
1208SERIALIZER = 'xmlrpclib'
1209
1210class _TestRemoteManager(BaseTestCase):
1211
1212 ALLOWED_TYPES = ('manager',)
1213
1214 def _putter(self, address, authkey):
1215 manager = QueueManager2(
1216 address=address, authkey=authkey, serializer=SERIALIZER
1217 )
1218 manager.connect()
1219 queue = manager.get_queue()
1220 queue.put(('hello world', None, True, 2.25))
1221
1222 def test_remote(self):
1223 authkey = os.urandom(32)
1224
1225 manager = QueueManager(
1226 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1227 )
1228 manager.start()
1229
1230 p = self.Process(target=self._putter, args=(manager.address, authkey))
1231 p.start()
1232
1233 manager2 = QueueManager2(
1234 address=manager.address, authkey=authkey, serializer=SERIALIZER
1235 )
1236 manager2.connect()
1237 queue = manager2.get_queue()
1238
1239 # Note that xmlrpclib will deserialize object as a list not a tuple
1240 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1241
1242 # Because we are using xmlrpclib for serialization instead of
1243 # pickle this will cause a serialization error.
1244 self.assertRaises(Exception, queue.put, time.sleep)
1245
1246 # Make queue finalizer run before the server is stopped
1247 del queue
1248 manager.shutdown()
1249
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001250class _TestManagerRestart(BaseTestCase):
1251
1252 def _putter(self, address, authkey):
1253 manager = QueueManager(
1254 address=address, authkey=authkey, serializer=SERIALIZER)
1255 manager.connect()
1256 queue = manager.get_queue()
1257 queue.put('hello world')
1258
1259 def test_rapid_restart(self):
1260 authkey = os.urandom(32)
1261 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001262 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1263 addr = manager.get_server().address
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001264 manager.start()
1265
1266 p = self.Process(target=self._putter, args=(manager.address, authkey))
1267 p.start()
1268 queue = manager.get_queue()
1269 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001270 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001271 manager.shutdown()
1272 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001273 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001274 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001275 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001276
Benjamin Petersone711caf2008-06-11 16:44:04 +00001277#
1278#
1279#
1280
1281SENTINEL = latin('')
1282
1283class _TestConnection(BaseTestCase):
1284
1285 ALLOWED_TYPES = ('processes', 'threads')
1286
1287 def _echo(self, conn):
1288 for msg in iter(conn.recv_bytes, SENTINEL):
1289 conn.send_bytes(msg)
1290 conn.close()
1291
1292 def test_connection(self):
1293 conn, child_conn = self.Pipe()
1294
1295 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001296 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001297 p.start()
1298
1299 seq = [1, 2.25, None]
1300 msg = latin('hello world')
1301 longmsg = msg * 10
1302 arr = array.array('i', list(range(4)))
1303
1304 if self.TYPE == 'processes':
1305 self.assertEqual(type(conn.fileno()), int)
1306
1307 self.assertEqual(conn.send(seq), None)
1308 self.assertEqual(conn.recv(), seq)
1309
1310 self.assertEqual(conn.send_bytes(msg), None)
1311 self.assertEqual(conn.recv_bytes(), msg)
1312
1313 if self.TYPE == 'processes':
1314 buffer = array.array('i', [0]*10)
1315 expected = list(arr) + [0] * (10 - len(arr))
1316 self.assertEqual(conn.send_bytes(arr), None)
1317 self.assertEqual(conn.recv_bytes_into(buffer),
1318 len(arr) * buffer.itemsize)
1319 self.assertEqual(list(buffer), expected)
1320
1321 buffer = array.array('i', [0]*10)
1322 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1323 self.assertEqual(conn.send_bytes(arr), None)
1324 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1325 len(arr) * buffer.itemsize)
1326 self.assertEqual(list(buffer), expected)
1327
1328 buffer = bytearray(latin(' ' * 40))
1329 self.assertEqual(conn.send_bytes(longmsg), None)
1330 try:
1331 res = conn.recv_bytes_into(buffer)
1332 except multiprocessing.BufferTooShort as e:
1333 self.assertEqual(e.args, (longmsg,))
1334 else:
1335 self.fail('expected BufferTooShort, got %s' % res)
1336
1337 poll = TimingWrapper(conn.poll)
1338
1339 self.assertEqual(poll(), False)
1340 self.assertTimingAlmostEqual(poll.elapsed, 0)
1341
1342 self.assertEqual(poll(TIMEOUT1), False)
1343 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1344
1345 conn.send(None)
1346
1347 self.assertEqual(poll(TIMEOUT1), True)
1348 self.assertTimingAlmostEqual(poll.elapsed, 0)
1349
1350 self.assertEqual(conn.recv(), None)
1351
1352 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1353 conn.send_bytes(really_big_msg)
1354 self.assertEqual(conn.recv_bytes(), really_big_msg)
1355
1356 conn.send_bytes(SENTINEL) # tell child to quit
1357 child_conn.close()
1358
1359 if self.TYPE == 'processes':
1360 self.assertEqual(conn.readable, True)
1361 self.assertEqual(conn.writable, True)
1362 self.assertRaises(EOFError, conn.recv)
1363 self.assertRaises(EOFError, conn.recv_bytes)
1364
1365 p.join()
1366
1367 def test_duplex_false(self):
1368 reader, writer = self.Pipe(duplex=False)
1369 self.assertEqual(writer.send(1), None)
1370 self.assertEqual(reader.recv(), 1)
1371 if self.TYPE == 'processes':
1372 self.assertEqual(reader.readable, True)
1373 self.assertEqual(reader.writable, False)
1374 self.assertEqual(writer.readable, False)
1375 self.assertEqual(writer.writable, True)
1376 self.assertRaises(IOError, reader.send, 2)
1377 self.assertRaises(IOError, writer.recv)
1378 self.assertRaises(IOError, writer.poll)
1379
1380 def test_spawn_close(self):
1381 # We test that a pipe connection can be closed by parent
1382 # process immediately after child is spawned. On Windows this
1383 # would have sometimes failed on old versions because
1384 # child_conn would be closed before the child got a chance to
1385 # duplicate it.
1386 conn, child_conn = self.Pipe()
1387
1388 p = self.Process(target=self._echo, args=(child_conn,))
1389 p.start()
1390 child_conn.close() # this might complete before child initializes
1391
1392 msg = latin('hello')
1393 conn.send_bytes(msg)
1394 self.assertEqual(conn.recv_bytes(), msg)
1395
1396 conn.send_bytes(SENTINEL)
1397 conn.close()
1398 p.join()
1399
1400 def test_sendbytes(self):
1401 if self.TYPE != 'processes':
1402 return
1403
1404 msg = latin('abcdefghijklmnopqrstuvwxyz')
1405 a, b = self.Pipe()
1406
1407 a.send_bytes(msg)
1408 self.assertEqual(b.recv_bytes(), msg)
1409
1410 a.send_bytes(msg, 5)
1411 self.assertEqual(b.recv_bytes(), msg[5:])
1412
1413 a.send_bytes(msg, 7, 8)
1414 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1415
1416 a.send_bytes(msg, 26)
1417 self.assertEqual(b.recv_bytes(), latin(''))
1418
1419 a.send_bytes(msg, 26, 0)
1420 self.assertEqual(b.recv_bytes(), latin(''))
1421
1422 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1423
1424 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1425
1426 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1427
1428 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1429
1430 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1431
Benjamin Petersone711caf2008-06-11 16:44:04 +00001432class _TestListenerClient(BaseTestCase):
1433
1434 ALLOWED_TYPES = ('processes', 'threads')
1435
1436 def _test(self, address):
1437 conn = self.connection.Client(address)
1438 conn.send('hello')
1439 conn.close()
1440
1441 def test_listener_client(self):
1442 for family in self.connection.families:
1443 l = self.connection.Listener(family=family)
1444 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001445 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001446 p.start()
1447 conn = l.accept()
1448 self.assertEqual(conn.recv(), 'hello')
1449 p.join()
1450 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001451#
1452# Test of sending connection and socket objects between processes
1453#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001454"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001455class _TestPicklingConnections(BaseTestCase):
1456
1457 ALLOWED_TYPES = ('processes',)
1458
1459 def _listener(self, conn, families):
1460 for fam in families:
1461 l = self.connection.Listener(family=fam)
1462 conn.send(l.address)
1463 new_conn = l.accept()
1464 conn.send(new_conn)
1465
1466 if self.TYPE == 'processes':
1467 l = socket.socket()
1468 l.bind(('localhost', 0))
1469 conn.send(l.getsockname())
1470 l.listen(1)
1471 new_conn, addr = l.accept()
1472 conn.send(new_conn)
1473
1474 conn.recv()
1475
1476 def _remote(self, conn):
1477 for (address, msg) in iter(conn.recv, None):
1478 client = self.connection.Client(address)
1479 client.send(msg.upper())
1480 client.close()
1481
1482 if self.TYPE == 'processes':
1483 address, msg = conn.recv()
1484 client = socket.socket()
1485 client.connect(address)
1486 client.sendall(msg.upper())
1487 client.close()
1488
1489 conn.close()
1490
1491 def test_pickling(self):
1492 try:
1493 multiprocessing.allow_connection_pickling()
1494 except ImportError:
1495 return
1496
1497 families = self.connection.families
1498
1499 lconn, lconn0 = self.Pipe()
1500 lp = self.Process(target=self._listener, args=(lconn0, families))
1501 lp.start()
1502 lconn0.close()
1503
1504 rconn, rconn0 = self.Pipe()
1505 rp = self.Process(target=self._remote, args=(rconn0,))
1506 rp.start()
1507 rconn0.close()
1508
1509 for fam in families:
1510 msg = ('This connection uses family %s' % fam).encode('ascii')
1511 address = lconn.recv()
1512 rconn.send((address, msg))
1513 new_conn = lconn.recv()
1514 self.assertEqual(new_conn.recv(), msg.upper())
1515
1516 rconn.send(None)
1517
1518 if self.TYPE == 'processes':
1519 msg = latin('This connection uses a normal socket')
1520 address = lconn.recv()
1521 rconn.send((address, msg))
1522 if hasattr(socket, 'fromfd'):
1523 new_conn = lconn.recv()
1524 self.assertEqual(new_conn.recv(100), msg.upper())
1525 else:
1526 # XXX On Windows with Py2.6 need to backport fromfd()
1527 discard = lconn.recv_bytes()
1528
1529 lconn.send(None)
1530
1531 rconn.close()
1532 lconn.close()
1533
1534 lp.join()
1535 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001536"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001537#
1538#
1539#
1540
1541class _TestHeap(BaseTestCase):
1542
1543 ALLOWED_TYPES = ('processes',)
1544
1545 def test_heap(self):
1546 iterations = 5000
1547 maxblocks = 50
1548 blocks = []
1549
1550 # create and destroy lots of blocks of different sizes
1551 for i in range(iterations):
1552 size = int(random.lognormvariate(0, 1) * 1000)
1553 b = multiprocessing.heap.BufferWrapper(size)
1554 blocks.append(b)
1555 if len(blocks) > maxblocks:
1556 i = random.randrange(maxblocks)
1557 del blocks[i]
1558
1559 # get the heap object
1560 heap = multiprocessing.heap.BufferWrapper._heap
1561
1562 # verify the state of the heap
1563 all = []
1564 occupied = 0
1565 for L in list(heap._len_to_seq.values()):
1566 for arena, start, stop in L:
1567 all.append((heap._arenas.index(arena), start, stop,
1568 stop-start, 'free'))
1569 for arena, start, stop in heap._allocated_blocks:
1570 all.append((heap._arenas.index(arena), start, stop,
1571 stop-start, 'occupied'))
1572 occupied += (stop-start)
1573
1574 all.sort()
1575
1576 for i in range(len(all)-1):
1577 (arena, start, stop) = all[i][:3]
1578 (narena, nstart, nstop) = all[i+1][:3]
1579 self.assertTrue((arena != narena and nstart == 0) or
1580 (stop == nstart))
1581
1582#
1583#
1584#
1585
Benjamin Petersone711caf2008-06-11 16:44:04 +00001586class _Foo(Structure):
1587 _fields_ = [
1588 ('x', c_int),
1589 ('y', c_double)
1590 ]
1591
Brian Curtinafa88b52010-10-07 01:12:19 +00001592@unittest.skipUnless(HAS_SHAREDCTYPES,
1593 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001594class _TestSharedCTypes(BaseTestCase):
1595
1596 ALLOWED_TYPES = ('processes',)
1597
1598 def _double(self, x, y, foo, arr, string):
1599 x.value *= 2
1600 y.value *= 2
1601 foo.x *= 2
1602 foo.y *= 2
1603 string.value *= 2
1604 for i in range(len(arr)):
1605 arr[i] *= 2
1606
1607 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001608 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001609 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001610 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001611 arr = self.Array('d', list(range(10)), lock=lock)
1612 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001613 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001614
1615 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1616 p.start()
1617 p.join()
1618
1619 self.assertEqual(x.value, 14)
1620 self.assertAlmostEqual(y.value, 2.0/3.0)
1621 self.assertEqual(foo.x, 6)
1622 self.assertAlmostEqual(foo.y, 4.0)
1623 for i in range(10):
1624 self.assertAlmostEqual(arr[i], i*2)
1625 self.assertEqual(string.value, latin('hellohello'))
1626
1627 def test_synchronize(self):
1628 self.test_sharedctypes(lock=True)
1629
1630 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001631 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001632 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001633 foo.x = 0
1634 foo.y = 0
1635 self.assertEqual(bar.x, 2)
1636 self.assertAlmostEqual(bar.y, 5.0)
1637
1638#
1639#
1640#
1641
1642class _TestFinalize(BaseTestCase):
1643
1644 ALLOWED_TYPES = ('processes',)
1645
1646 def _test_finalize(self, conn):
1647 class Foo(object):
1648 pass
1649
1650 a = Foo()
1651 util.Finalize(a, conn.send, args=('a',))
1652 del a # triggers callback for a
1653
1654 b = Foo()
1655 close_b = util.Finalize(b, conn.send, args=('b',))
1656 close_b() # triggers callback for b
1657 close_b() # does nothing because callback has already been called
1658 del b # does nothing because callback has already been called
1659
1660 c = Foo()
1661 util.Finalize(c, conn.send, args=('c',))
1662
1663 d10 = Foo()
1664 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1665
1666 d01 = Foo()
1667 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1668 d02 = Foo()
1669 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1670 d03 = Foo()
1671 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1672
1673 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1674
1675 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1676
1677 # call mutliprocessing's cleanup function then exit process without
1678 # garbage collecting locals
1679 util._exit_function()
1680 conn.close()
1681 os._exit(0)
1682
1683 def test_finalize(self):
1684 conn, child_conn = self.Pipe()
1685
1686 p = self.Process(target=self._test_finalize, args=(child_conn,))
1687 p.start()
1688 p.join()
1689
1690 result = [obj for obj in iter(conn.recv, 'STOP')]
1691 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1692
1693#
1694# Test that from ... import * works for each module
1695#
1696
1697class _TestImportStar(BaseTestCase):
1698
1699 ALLOWED_TYPES = ('processes',)
1700
1701 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001702 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001703 'multiprocessing', 'multiprocessing.connection',
1704 'multiprocessing.heap', 'multiprocessing.managers',
1705 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001706 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001707 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001708 ]
1709
1710 if c_int is not None:
1711 # This module requires _ctypes
1712 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001713
1714 for name in modules:
1715 __import__(name)
1716 mod = sys.modules[name]
1717
1718 for attr in getattr(mod, '__all__', ()):
1719 self.assertTrue(
1720 hasattr(mod, attr),
1721 '%r does not have attribute %r' % (mod, attr)
1722 )
1723
1724#
1725# Quick test that logging works -- does not test logging output
1726#
1727
1728class _TestLogging(BaseTestCase):
1729
1730 ALLOWED_TYPES = ('processes',)
1731
1732 def test_enable_logging(self):
1733 logger = multiprocessing.get_logger()
1734 logger.setLevel(util.SUBWARNING)
1735 self.assertTrue(logger is not None)
1736 logger.debug('this will not be printed')
1737 logger.info('nor will this')
1738 logger.setLevel(LOG_LEVEL)
1739
1740 def _test_level(self, conn):
1741 logger = multiprocessing.get_logger()
1742 conn.send(logger.getEffectiveLevel())
1743
1744 def test_level(self):
1745 LEVEL1 = 32
1746 LEVEL2 = 37
1747
1748 logger = multiprocessing.get_logger()
1749 root_logger = logging.getLogger()
1750 root_level = root_logger.level
1751
1752 reader, writer = multiprocessing.Pipe(duplex=False)
1753
1754 logger.setLevel(LEVEL1)
1755 self.Process(target=self._test_level, args=(writer,)).start()
1756 self.assertEqual(LEVEL1, reader.recv())
1757
1758 logger.setLevel(logging.NOTSET)
1759 root_logger.setLevel(LEVEL2)
1760 self.Process(target=self._test_level, args=(writer,)).start()
1761 self.assertEqual(LEVEL2, reader.recv())
1762
1763 root_logger.setLevel(root_level)
1764 logger.setLevel(level=LOG_LEVEL)
1765
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001766
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001767# class _TestLoggingProcessName(BaseTestCase):
1768#
1769# def handle(self, record):
1770# assert record.processName == multiprocessing.current_process().name
1771# self.__handled = True
1772#
1773# def test_logging(self):
1774# handler = logging.Handler()
1775# handler.handle = self.handle
1776# self.__handled = False
1777# # Bypass getLogger() and side-effects
1778# logger = logging.getLoggerClass()(
1779# 'multiprocessing.test.TestLoggingProcessName')
1780# logger.addHandler(handler)
1781# logger.propagate = False
1782#
1783# logger.warn('foo')
1784# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001785
Benjamin Petersone711caf2008-06-11 16:44:04 +00001786#
Jesse Noller6214edd2009-01-19 16:23:53 +00001787# Test to verify handle verification, see issue 3321
1788#
1789
1790class TestInvalidHandle(unittest.TestCase):
1791
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001792 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001793 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001794 conn = _multiprocessing.Connection(44977608)
1795 self.assertRaises(IOError, conn.poll)
1796 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001797
Jesse Noller6214edd2009-01-19 16:23:53 +00001798#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001799# Functions used to create test cases from the base ones in this module
1800#
1801
1802def get_attributes(Source, names):
1803 d = {}
1804 for name in names:
1805 obj = getattr(Source, name)
1806 if type(obj) == type(get_attributes):
1807 obj = staticmethod(obj)
1808 d[name] = obj
1809 return d
1810
1811def create_test_cases(Mixin, type):
1812 result = {}
1813 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001814 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001815
1816 for name in list(glob.keys()):
1817 if name.startswith('_Test'):
1818 base = glob[name]
1819 if type in base.ALLOWED_TYPES:
1820 newname = 'With' + Type + name[1:]
1821 class Temp(base, unittest.TestCase, Mixin):
1822 pass
1823 result[newname] = Temp
1824 Temp.__name__ = newname
1825 Temp.__module__ = Mixin.__module__
1826 return result
1827
1828#
1829# Create test cases
1830#
1831
1832class ProcessesMixin(object):
1833 TYPE = 'processes'
1834 Process = multiprocessing.Process
1835 locals().update(get_attributes(multiprocessing, (
1836 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1837 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1838 'RawArray', 'current_process', 'active_children', 'Pipe',
1839 'connection', 'JoinableQueue'
1840 )))
1841
1842testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1843globals().update(testcases_processes)
1844
1845
1846class ManagerMixin(object):
1847 TYPE = 'manager'
1848 Process = multiprocessing.Process
1849 manager = object.__new__(multiprocessing.managers.SyncManager)
1850 locals().update(get_attributes(manager, (
1851 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1852 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1853 'Namespace', 'JoinableQueue'
1854 )))
1855
1856testcases_manager = create_test_cases(ManagerMixin, type='manager')
1857globals().update(testcases_manager)
1858
1859
1860class ThreadsMixin(object):
1861 TYPE = 'threads'
1862 Process = multiprocessing.dummy.Process
1863 locals().update(get_attributes(multiprocessing.dummy, (
1864 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1865 'Condition', 'Event', 'Value', 'Array', 'current_process',
1866 'active_children', 'Pipe', 'connection', 'dict', 'list',
1867 'Namespace', 'JoinableQueue'
1868 )))
1869
1870testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1871globals().update(testcases_threads)
1872
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001873class OtherTest(unittest.TestCase):
1874 # TODO: add more tests for deliver/answer challenge.
1875 def test_deliver_challenge_auth_failure(self):
1876 class _FakeConnection(object):
1877 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001878 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001879 def send_bytes(self, data):
1880 pass
1881 self.assertRaises(multiprocessing.AuthenticationError,
1882 multiprocessing.connection.deliver_challenge,
1883 _FakeConnection(), b'abc')
1884
1885 def test_answer_challenge_auth_failure(self):
1886 class _FakeConnection(object):
1887 def __init__(self):
1888 self.count = 0
1889 def recv_bytes(self, size):
1890 self.count += 1
1891 if self.count == 1:
1892 return multiprocessing.connection.CHALLENGE
1893 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001894 return b'something bogus'
1895 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001896 def send_bytes(self, data):
1897 pass
1898 self.assertRaises(multiprocessing.AuthenticationError,
1899 multiprocessing.connection.answer_challenge,
1900 _FakeConnection(), b'abc')
1901
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001902#
1903# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1904#
1905
1906def initializer(ns):
1907 ns.test += 1
1908
1909class TestInitializers(unittest.TestCase):
1910 def setUp(self):
1911 self.mgr = multiprocessing.Manager()
1912 self.ns = self.mgr.Namespace()
1913 self.ns.test = 0
1914
1915 def tearDown(self):
1916 self.mgr.shutdown()
1917
1918 def test_manager_initializer(self):
1919 m = multiprocessing.managers.SyncManager()
1920 self.assertRaises(TypeError, m.start, 1)
1921 m.start(initializer, (self.ns,))
1922 self.assertEqual(self.ns.test, 1)
1923 m.shutdown()
1924
1925 def test_pool_initializer(self):
1926 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1927 p = multiprocessing.Pool(1, initializer, (self.ns,))
1928 p.close()
1929 p.join()
1930 self.assertEqual(self.ns.test, 1)
1931
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00001932#
1933# Issue 5155, 5313, 5331: Test process in processes
1934# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1935#
1936
1937def _ThisSubProcess(q):
1938 try:
1939 item = q.get(block=False)
1940 except pyqueue.Empty:
1941 pass
1942
1943def _TestProcess(q):
1944 queue = multiprocessing.Queue()
1945 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1946 subProc.start()
1947 subProc.join()
1948
1949def _afunc(x):
1950 return x*x
1951
1952def pool_in_process():
1953 pool = multiprocessing.Pool(processes=4)
1954 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1955
1956class _file_like(object):
1957 def __init__(self, delegate):
1958 self._delegate = delegate
1959 self._pid = None
1960
1961 @property
1962 def cache(self):
1963 pid = os.getpid()
1964 # There are no race conditions since fork keeps only the running thread
1965 if pid != self._pid:
1966 self._pid = pid
1967 self._cache = []
1968 return self._cache
1969
1970 def write(self, data):
1971 self.cache.append(data)
1972
1973 def flush(self):
1974 self._delegate.write(''.join(self.cache))
1975 self._cache = []
1976
1977class TestStdinBadfiledescriptor(unittest.TestCase):
1978
1979 def test_queue_in_process(self):
1980 queue = multiprocessing.Queue()
1981 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1982 proc.start()
1983 proc.join()
1984
1985 def test_pool_in_process(self):
1986 p = multiprocessing.Process(target=pool_in_process)
1987 p.start()
1988 p.join()
1989
1990 def test_flushing(self):
1991 sio = io.StringIO()
1992 flike = _file_like(sio)
1993 flike.write('foo')
1994 proc = multiprocessing.Process(target=lambda: flike.flush())
1995 flike.flush()
1996 assert sio.getvalue() == 'foo'
1997
1998testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1999 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002000
Benjamin Petersone711caf2008-06-11 16:44:04 +00002001#
2002#
2003#
2004
2005def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002006 if sys.platform.startswith("linux"):
2007 try:
2008 lock = multiprocessing.RLock()
2009 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002010 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002011
Benjamin Petersone711caf2008-06-11 16:44:04 +00002012 if run is None:
2013 from test.support import run_unittest as run
2014
2015 util.get_temp_dir() # creates temp directory for use by all processes
2016
2017 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2018
Benjamin Peterson41181742008-07-02 20:22:54 +00002019 ProcessesMixin.pool = multiprocessing.Pool(4)
2020 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2021 ManagerMixin.manager.__init__()
2022 ManagerMixin.manager.start()
2023 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002024
2025 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002026 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2027 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002028 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2029 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002030 )
2031
2032 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2033 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2034 run(suite)
2035
Benjamin Peterson41181742008-07-02 20:22:54 +00002036 ThreadsMixin.pool.terminate()
2037 ProcessesMixin.pool.terminate()
2038 ManagerMixin.pool.terminate()
2039 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002040
Benjamin Peterson41181742008-07-02 20:22:54 +00002041 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002042
2043def main():
2044 test_main(unittest.TextTestRunner(verbosity=2).run)
2045
2046if __name__ == '__main__':
2047 main()