blob: 73d12dc879827b0ddb3b863b5092d1268929c476 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
14import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000019import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000020
Benjamin Petersone5384b02008-10-04 22:00:42 +000021
R. David Murraya21e4ca2009-03-31 23:16:50 +000022# Skip tests if _multiprocessing wasn't built.
23_multiprocessing = test.support.import_module('_multiprocessing')
24# Skip tests if sem_open implementation is broken.
25test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000026# import threading after _multiprocessing to raise a more revelant error
27# message: "No module named _multiprocessing". _multiprocessing is not compiled
28# without thread support.
29import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
Benjamin Petersone711caf2008-06-11 16:44:04 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
37from multiprocessing import util
38
39#
40#
41#
42
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000043def latin(s):
44 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Benjamin Petersone711caf2008-06-11 16:44:04 +000046#
47# Constants
48#
49
50LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000051#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000052
53DELTA = 0.1
54CHECK_TIMINGS = False # making true makes tests take a lot longer
55 # and can sometimes cause some non-serious
56 # failures because some calls block a bit
57 # longer than expected
58if CHECK_TIMINGS:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
60else:
61 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
62
63HAVE_GETVALUE = not getattr(_multiprocessing,
64 'HAVE_BROKEN_SEM_GETVALUE', False)
65
Jesse Noller6214edd2009-01-19 16:23:53 +000066WIN32 = (sys.platform == "win32")
67
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000069# Some tests require ctypes
70#
71
72try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000073 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000074except ImportError:
75 Structure = object
76 c_int = c_double = None
77
Florent Xiclunaaa171062010-08-14 15:56:42 +000078try:
79 from ctypes import Value
80except ImportError:
81 Value = None
82
83try:
84 from ctypes import copy as ctypes_copy
85except ImportError:
86 ctypes_copy = None
87
Florent Xiclunafd1b0932010-03-28 00:25:02 +000088#
Benjamin Petersone711caf2008-06-11 16:44:04 +000089# Creates a wrapper for a function which records the time it takes to finish
90#
91
92class TimingWrapper(object):
93
94 def __init__(self, func):
95 self.func = func
96 self.elapsed = None
97
98 def __call__(self, *args, **kwds):
99 t = time.time()
100 try:
101 return self.func(*args, **kwds)
102 finally:
103 self.elapsed = time.time() - t
104
105#
106# Base class for test cases
107#
108
109class BaseTestCase(object):
110
111 ALLOWED_TYPES = ('processes', 'manager', 'threads')
112
113 def assertTimingAlmostEqual(self, a, b):
114 if CHECK_TIMINGS:
115 self.assertAlmostEqual(a, b, 1)
116
117 def assertReturnsIfImplemented(self, value, func, *args):
118 try:
119 res = func(*args)
120 except NotImplementedError:
121 pass
122 else:
123 return self.assertEqual(value, res)
124
125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000154 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155
156 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000158 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162
163 def _test(self, q, *args, **kwds):
164 current = self.current_process()
165 q.put(args)
166 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000167 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000168 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000169 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170 q.put(current.pid)
171
172 def test_process(self):
173 q = self.Queue(1)
174 e = self.Event()
175 args = (q, 1, 2)
176 kwargs = {'hello':23, 'bye':2.54}
177 name = 'SomeProcess'
178 p = self.Process(
179 target=self._test, args=args, kwargs=kwargs, name=name
180 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000181 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182 current = self.current_process()
183
184 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000187 self.assertEquals(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000188 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 p.start()
193
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000194 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195 self.assertEquals(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000196 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197
198 self.assertEquals(q.get(), args[1:])
199 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000200 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 self.assertEquals(q.get(), p.pid)
204
205 p.join()
206
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208 self.assertEquals(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000209 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210
211 def _test_terminate(self):
212 time.sleep(1000)
213
214 def test_terminate(self):
215 if self.TYPE == 'threads':
216 return
217
218 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000219 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220 p.start()
221
222 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000223 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 p.terminate()
227
228 join = TimingWrapper(p.join)
229 self.assertEqual(join(), None)
230 self.assertTimingAlmostEqual(join.elapsed, 0.0)
231
232 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000233 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
235 p.join()
236
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000237 # XXX sometimes get p.exitcode == 0 on Windows ...
238 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 def test_cpu_count(self):
241 try:
242 cpus = multiprocessing.cpu_count()
243 except NotImplementedError:
244 cpus = 1
245 self.assertTrue(type(cpus) is int)
246 self.assertTrue(cpus >= 1)
247
248 def test_active_children(self):
249 self.assertEqual(type(self.active_children()), list)
250
251 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000252 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000253
254 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256
257 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259
260 def _test_recursion(self, wconn, id):
261 from multiprocessing import forking
262 wconn.send(id)
263 if len(id) < 2:
264 for i in range(2):
265 p = self.Process(
266 target=self._test_recursion, args=(wconn, id+[i])
267 )
268 p.start()
269 p.join()
270
271 def test_recursion(self):
272 rconn, wconn = self.Pipe(duplex=False)
273 self._test_recursion(wconn, [])
274
275 time.sleep(DELTA)
276 result = []
277 while rconn.poll():
278 result.append(rconn.recv())
279
280 expected = [
281 [],
282 [0],
283 [0, 0],
284 [0, 1],
285 [1],
286 [1, 0],
287 [1, 1]
288 ]
289 self.assertEqual(result, expected)
290
291#
292#
293#
294
295class _UpperCaser(multiprocessing.Process):
296
297 def __init__(self):
298 multiprocessing.Process.__init__(self)
299 self.child_conn, self.parent_conn = multiprocessing.Pipe()
300
301 def run(self):
302 self.parent_conn.close()
303 for s in iter(self.child_conn.recv, None):
304 self.child_conn.send(s.upper())
305 self.child_conn.close()
306
307 def submit(self, s):
308 assert type(s) is str
309 self.parent_conn.send(s)
310 return self.parent_conn.recv()
311
312 def stop(self):
313 self.parent_conn.send(None)
314 self.parent_conn.close()
315 self.child_conn.close()
316
317class _TestSubclassingProcess(BaseTestCase):
318
319 ALLOWED_TYPES = ('processes',)
320
321 def test_subclassing(self):
322 uppercaser = _UpperCaser()
323 uppercaser.start()
324 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
325 self.assertEqual(uppercaser.submit('world'), 'WORLD')
326 uppercaser.stop()
327 uppercaser.join()
328
329#
330#
331#
332
333def queue_empty(q):
334 if hasattr(q, 'empty'):
335 return q.empty()
336 else:
337 return q.qsize() == 0
338
339def queue_full(q, maxsize):
340 if hasattr(q, 'full'):
341 return q.full()
342 else:
343 return q.qsize() == maxsize
344
345
346class _TestQueue(BaseTestCase):
347
348
349 def _test_put(self, queue, child_can_start, parent_can_continue):
350 child_can_start.wait()
351 for i in range(6):
352 queue.get()
353 parent_can_continue.set()
354
355 def test_put(self):
356 MAXSIZE = 6
357 queue = self.Queue(maxsize=MAXSIZE)
358 child_can_start = self.Event()
359 parent_can_continue = self.Event()
360
361 proc = self.Process(
362 target=self._test_put,
363 args=(queue, child_can_start, parent_can_continue)
364 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000365 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000366 proc.start()
367
368 self.assertEqual(queue_empty(queue), True)
369 self.assertEqual(queue_full(queue, MAXSIZE), False)
370
371 queue.put(1)
372 queue.put(2, True)
373 queue.put(3, True, None)
374 queue.put(4, False)
375 queue.put(5, False, None)
376 queue.put_nowait(6)
377
378 # the values may be in buffer but not yet in pipe so sleep a bit
379 time.sleep(DELTA)
380
381 self.assertEqual(queue_empty(queue), False)
382 self.assertEqual(queue_full(queue, MAXSIZE), True)
383
384 put = TimingWrapper(queue.put)
385 put_nowait = TimingWrapper(queue.put_nowait)
386
387 self.assertRaises(pyqueue.Full, put, 7, False)
388 self.assertTimingAlmostEqual(put.elapsed, 0)
389
390 self.assertRaises(pyqueue.Full, put, 7, False, None)
391 self.assertTimingAlmostEqual(put.elapsed, 0)
392
393 self.assertRaises(pyqueue.Full, put_nowait, 7)
394 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
395
396 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
397 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
398
399 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
400 self.assertTimingAlmostEqual(put.elapsed, 0)
401
402 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
403 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
404
405 child_can_start.set()
406 parent_can_continue.wait()
407
408 self.assertEqual(queue_empty(queue), True)
409 self.assertEqual(queue_full(queue, MAXSIZE), False)
410
411 proc.join()
412
413 def _test_get(self, queue, child_can_start, parent_can_continue):
414 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000415 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000416 queue.put(2)
417 queue.put(3)
418 queue.put(4)
419 queue.put(5)
420 parent_can_continue.set()
421
422 def test_get(self):
423 queue = self.Queue()
424 child_can_start = self.Event()
425 parent_can_continue = self.Event()
426
427 proc = self.Process(
428 target=self._test_get,
429 args=(queue, child_can_start, parent_can_continue)
430 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000431 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000432 proc.start()
433
434 self.assertEqual(queue_empty(queue), True)
435
436 child_can_start.set()
437 parent_can_continue.wait()
438
439 time.sleep(DELTA)
440 self.assertEqual(queue_empty(queue), False)
441
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000442 # Hangs unexpectedly, remove for now
443 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000444 self.assertEqual(queue.get(True, None), 2)
445 self.assertEqual(queue.get(True), 3)
446 self.assertEqual(queue.get(timeout=1), 4)
447 self.assertEqual(queue.get_nowait(), 5)
448
449 self.assertEqual(queue_empty(queue), True)
450
451 get = TimingWrapper(queue.get)
452 get_nowait = TimingWrapper(queue.get_nowait)
453
454 self.assertRaises(pyqueue.Empty, get, False)
455 self.assertTimingAlmostEqual(get.elapsed, 0)
456
457 self.assertRaises(pyqueue.Empty, get, False, None)
458 self.assertTimingAlmostEqual(get.elapsed, 0)
459
460 self.assertRaises(pyqueue.Empty, get_nowait)
461 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
462
463 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
464 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
465
466 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
467 self.assertTimingAlmostEqual(get.elapsed, 0)
468
469 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
470 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
471
472 proc.join()
473
474 def _test_fork(self, queue):
475 for i in range(10, 20):
476 queue.put(i)
477 # note that at this point the items may only be buffered, so the
478 # process cannot shutdown until the feeder thread has finished
479 # pushing items onto the pipe.
480
481 def test_fork(self):
482 # Old versions of Queue would fail to create a new feeder
483 # thread for a forked process if the original process had its
484 # own feeder thread. This test checks that this no longer
485 # happens.
486
487 queue = self.Queue()
488
489 # put items on queue so that main process starts a feeder thread
490 for i in range(10):
491 queue.put(i)
492
493 # wait to make sure thread starts before we fork a new process
494 time.sleep(DELTA)
495
496 # fork process
497 p = self.Process(target=self._test_fork, args=(queue,))
498 p.start()
499
500 # check that all expected items are in the queue
501 for i in range(20):
502 self.assertEqual(queue.get(), i)
503 self.assertRaises(pyqueue.Empty, queue.get, False)
504
505 p.join()
506
507 def test_qsize(self):
508 q = self.Queue()
509 try:
510 self.assertEqual(q.qsize(), 0)
511 except NotImplementedError:
512 return
513 q.put(1)
514 self.assertEqual(q.qsize(), 1)
515 q.put(5)
516 self.assertEqual(q.qsize(), 2)
517 q.get()
518 self.assertEqual(q.qsize(), 1)
519 q.get()
520 self.assertEqual(q.qsize(), 0)
521
522 def _test_task_done(self, q):
523 for obj in iter(q.get, None):
524 time.sleep(DELTA)
525 q.task_done()
526
527 def test_task_done(self):
528 queue = self.JoinableQueue()
529
530 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000531 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532
533 workers = [self.Process(target=self._test_task_done, args=(queue,))
534 for i in range(4)]
535
536 for p in workers:
537 p.start()
538
539 for i in range(10):
540 queue.put(i)
541
542 queue.join()
543
544 for p in workers:
545 queue.put(None)
546
547 for p in workers:
548 p.join()
549
550#
551#
552#
553
554class _TestLock(BaseTestCase):
555
556 def test_lock(self):
557 lock = self.Lock()
558 self.assertEqual(lock.acquire(), True)
559 self.assertEqual(lock.acquire(False), False)
560 self.assertEqual(lock.release(), None)
561 self.assertRaises((ValueError, threading.ThreadError), lock.release)
562
563 def test_rlock(self):
564 lock = self.RLock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(), True)
567 self.assertEqual(lock.acquire(), True)
568 self.assertEqual(lock.release(), None)
569 self.assertEqual(lock.release(), None)
570 self.assertEqual(lock.release(), None)
571 self.assertRaises((AssertionError, RuntimeError), lock.release)
572
Jesse Nollerf8d00852009-03-31 03:25:07 +0000573 def test_lock_context(self):
574 with self.Lock():
575 pass
576
Benjamin Petersone711caf2008-06-11 16:44:04 +0000577
578class _TestSemaphore(BaseTestCase):
579
580 def _test_semaphore(self, sem):
581 self.assertReturnsIfImplemented(2, get_value, sem)
582 self.assertEqual(sem.acquire(), True)
583 self.assertReturnsIfImplemented(1, get_value, sem)
584 self.assertEqual(sem.acquire(), True)
585 self.assertReturnsIfImplemented(0, get_value, sem)
586 self.assertEqual(sem.acquire(False), False)
587 self.assertReturnsIfImplemented(0, get_value, sem)
588 self.assertEqual(sem.release(), None)
589 self.assertReturnsIfImplemented(1, get_value, sem)
590 self.assertEqual(sem.release(), None)
591 self.assertReturnsIfImplemented(2, get_value, sem)
592
593 def test_semaphore(self):
594 sem = self.Semaphore(2)
595 self._test_semaphore(sem)
596 self.assertEqual(sem.release(), None)
597 self.assertReturnsIfImplemented(3, get_value, sem)
598 self.assertEqual(sem.release(), None)
599 self.assertReturnsIfImplemented(4, get_value, sem)
600
601 def test_bounded_semaphore(self):
602 sem = self.BoundedSemaphore(2)
603 self._test_semaphore(sem)
604 # Currently fails on OS/X
605 #if HAVE_GETVALUE:
606 # self.assertRaises(ValueError, sem.release)
607 # self.assertReturnsIfImplemented(2, get_value, sem)
608
609 def test_timeout(self):
610 if self.TYPE != 'processes':
611 return
612
613 sem = self.Semaphore(0)
614 acquire = TimingWrapper(sem.acquire)
615
616 self.assertEqual(acquire(False), False)
617 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
618
619 self.assertEqual(acquire(False, None), False)
620 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
621
622 self.assertEqual(acquire(False, TIMEOUT1), False)
623 self.assertTimingAlmostEqual(acquire.elapsed, 0)
624
625 self.assertEqual(acquire(True, TIMEOUT2), False)
626 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
627
628 self.assertEqual(acquire(timeout=TIMEOUT3), False)
629 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
630
631
632class _TestCondition(BaseTestCase):
633
634 def f(self, cond, sleeping, woken, timeout=None):
635 cond.acquire()
636 sleeping.release()
637 cond.wait(timeout)
638 woken.release()
639 cond.release()
640
641 def check_invariant(self, cond):
642 # this is only supposed to succeed when there are no sleepers
643 if self.TYPE == 'processes':
644 try:
645 sleepers = (cond._sleeping_count.get_value() -
646 cond._woken_count.get_value())
647 self.assertEqual(sleepers, 0)
648 self.assertEqual(cond._wait_semaphore.get_value(), 0)
649 except NotImplementedError:
650 pass
651
652 def test_notify(self):
653 cond = self.Condition()
654 sleeping = self.Semaphore(0)
655 woken = self.Semaphore(0)
656
657 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000658 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000659 p.start()
660
661 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000662 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000663 p.start()
664
665 # wait for both children to start sleeping
666 sleeping.acquire()
667 sleeping.acquire()
668
669 # check no process/thread has woken up
670 time.sleep(DELTA)
671 self.assertReturnsIfImplemented(0, get_value, woken)
672
673 # wake up one process/thread
674 cond.acquire()
675 cond.notify()
676 cond.release()
677
678 # check one process/thread has woken up
679 time.sleep(DELTA)
680 self.assertReturnsIfImplemented(1, get_value, woken)
681
682 # wake up another
683 cond.acquire()
684 cond.notify()
685 cond.release()
686
687 # check other has woken up
688 time.sleep(DELTA)
689 self.assertReturnsIfImplemented(2, get_value, woken)
690
691 # check state is not mucked up
692 self.check_invariant(cond)
693 p.join()
694
695 def test_notify_all(self):
696 cond = self.Condition()
697 sleeping = self.Semaphore(0)
698 woken = self.Semaphore(0)
699
700 # start some threads/processes which will timeout
701 for i in range(3):
702 p = self.Process(target=self.f,
703 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000704 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000705 p.start()
706
707 t = threading.Thread(target=self.f,
708 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000709 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000710 t.start()
711
712 # wait for them all to sleep
713 for i in range(6):
714 sleeping.acquire()
715
716 # check they have all timed out
717 for i in range(6):
718 woken.acquire()
719 self.assertReturnsIfImplemented(0, get_value, woken)
720
721 # check state is not mucked up
722 self.check_invariant(cond)
723
724 # start some more threads/processes
725 for i in range(3):
726 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000727 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000728 p.start()
729
730 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000731 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000732 t.start()
733
734 # wait for them to all sleep
735 for i in range(6):
736 sleeping.acquire()
737
738 # check no process/thread has woken up
739 time.sleep(DELTA)
740 self.assertReturnsIfImplemented(0, get_value, woken)
741
742 # wake them all up
743 cond.acquire()
744 cond.notify_all()
745 cond.release()
746
747 # check they have all woken
748 time.sleep(DELTA)
749 self.assertReturnsIfImplemented(6, get_value, woken)
750
751 # check state is not mucked up
752 self.check_invariant(cond)
753
754 def test_timeout(self):
755 cond = self.Condition()
756 wait = TimingWrapper(cond.wait)
757 cond.acquire()
758 res = wait(TIMEOUT1)
759 cond.release()
760 self.assertEqual(res, None)
761 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
762
763
764class _TestEvent(BaseTestCase):
765
766 def _test_event(self, event):
767 time.sleep(TIMEOUT2)
768 event.set()
769
770 def test_event(self):
771 event = self.Event()
772 wait = TimingWrapper(event.wait)
773
774 # Removed temporaily, due to API shear, this does not
775 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000776 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777
Benjamin Peterson965ce872009-04-05 21:24:58 +0000778 # Removed, threading.Event.wait() will return the value of the __flag
779 # instead of None. API Shear with the semaphore backed mp.Event
780 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000781 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000782 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000783 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
784
785 event.set()
786
787 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 self.assertEqual(event.is_set(), True)
789 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000791 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
793 # self.assertEqual(event.is_set(), True)
794
795 event.clear()
796
797 #self.assertEqual(event.is_set(), False)
798
799 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801
802#
803#
804#
805
806class _TestValue(BaseTestCase):
807
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000808 ALLOWED_TYPES = ('processes',)
809
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810 codes_values = [
811 ('i', 4343, 24234),
812 ('d', 3.625, -4.25),
813 ('h', -232, 234),
814 ('c', latin('x'), latin('y'))
815 ]
816
817 def _test(self, values):
818 for sv, cv in zip(values, self.codes_values):
819 sv.value = cv[2]
820
821
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000822 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000824 if raw:
825 values = [self.RawValue(code, value)
826 for code, value, _ in self.codes_values]
827 else:
828 values = [self.Value(code, value)
829 for code, value, _ in self.codes_values]
830
831 for sv, cv in zip(values, self.codes_values):
832 self.assertEqual(sv.value, cv[1])
833
834 proc = self.Process(target=self._test, args=(values,))
835 proc.start()
836 proc.join()
837
838 for sv, cv in zip(values, self.codes_values):
839 self.assertEqual(sv.value, cv[2])
840
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000841 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000842 def test_rawvalue(self):
843 self.test_value(raw=True)
844
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000845 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000846 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000847 val1 = self.Value('i', 5)
848 lock1 = val1.get_lock()
849 obj1 = val1.get_obj()
850
851 val2 = self.Value('i', 5, lock=None)
852 lock2 = val2.get_lock()
853 obj2 = val2.get_obj()
854
855 lock = self.Lock()
856 val3 = self.Value('i', 5, lock=lock)
857 lock3 = val3.get_lock()
858 obj3 = val3.get_obj()
859 self.assertEqual(lock, lock3)
860
Jesse Nollerb0516a62009-01-18 03:11:38 +0000861 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000862 self.assertFalse(hasattr(arr4, 'get_lock'))
863 self.assertFalse(hasattr(arr4, 'get_obj'))
864
Jesse Nollerb0516a62009-01-18 03:11:38 +0000865 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
866
867 arr5 = self.RawValue('i', 5)
868 self.assertFalse(hasattr(arr5, 'get_lock'))
869 self.assertFalse(hasattr(arr5, 'get_obj'))
870
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871
872class _TestArray(BaseTestCase):
873
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000874 ALLOWED_TYPES = ('processes',)
875
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 def f(self, seq):
877 for i in range(1, len(seq)):
878 seq[i] += seq[i-1]
879
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000880 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
883 if raw:
884 arr = self.RawArray('i', seq)
885 else:
886 arr = self.Array('i', seq)
887
888 self.assertEqual(len(arr), len(seq))
889 self.assertEqual(arr[3], seq[3])
890 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
891
892 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
893
894 self.assertEqual(list(arr[:]), seq)
895
896 self.f(seq)
897
898 p = self.Process(target=self.f, args=(arr,))
899 p.start()
900 p.join()
901
902 self.assertEqual(list(arr[:]), seq)
903
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000904 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905 def test_rawarray(self):
906 self.test_array(raw=True)
907
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000908 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000909 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000910 arr1 = self.Array('i', list(range(10)))
911 lock1 = arr1.get_lock()
912 obj1 = arr1.get_obj()
913
914 arr2 = self.Array('i', list(range(10)), lock=None)
915 lock2 = arr2.get_lock()
916 obj2 = arr2.get_obj()
917
918 lock = self.Lock()
919 arr3 = self.Array('i', list(range(10)), lock=lock)
920 lock3 = arr3.get_lock()
921 obj3 = arr3.get_obj()
922 self.assertEqual(lock, lock3)
923
Jesse Nollerb0516a62009-01-18 03:11:38 +0000924 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000925 self.assertFalse(hasattr(arr4, 'get_lock'))
926 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000927 self.assertRaises(AttributeError,
928 self.Array, 'i', range(10), lock='notalock')
929
930 arr5 = self.RawArray('i', range(10))
931 self.assertFalse(hasattr(arr5, 'get_lock'))
932 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933
934#
935#
936#
937
938class _TestContainers(BaseTestCase):
939
940 ALLOWED_TYPES = ('manager',)
941
942 def test_list(self):
943 a = self.list(list(range(10)))
944 self.assertEqual(a[:], list(range(10)))
945
946 b = self.list()
947 self.assertEqual(b[:], [])
948
949 b.extend(list(range(5)))
950 self.assertEqual(b[:], list(range(5)))
951
952 self.assertEqual(b[2], 2)
953 self.assertEqual(b[2:10], [2,3,4])
954
955 b *= 2
956 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
957
958 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
959
960 self.assertEqual(a[:], list(range(10)))
961
962 d = [a, b]
963 e = self.list(d)
964 self.assertEqual(
965 e[:],
966 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
967 )
968
969 f = self.list([a])
970 a.append('hello')
971 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
972
973 def test_dict(self):
974 d = self.dict()
975 indices = list(range(65, 70))
976 for i in indices:
977 d[i] = chr(i)
978 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
979 self.assertEqual(sorted(d.keys()), indices)
980 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
981 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
982
983 def test_namespace(self):
984 n = self.Namespace()
985 n.name = 'Bob'
986 n.job = 'Builder'
987 n._hidden = 'hidden'
988 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
989 del n.job
990 self.assertEqual(str(n), "Namespace(name='Bob')")
991 self.assertTrue(hasattr(n, 'name'))
992 self.assertTrue(not hasattr(n, 'job'))
993
994#
995#
996#
997
998def sqr(x, wait=0.0):
999 time.sleep(wait)
1000 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +00001001class _TestPool(BaseTestCase):
1002
1003 def test_apply(self):
1004 papply = self.pool.apply
1005 self.assertEqual(papply(sqr, (5,)), sqr(5))
1006 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1007
1008 def test_map(self):
1009 pmap = self.pool.map
1010 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1011 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1012 list(map(sqr, list(range(100)))))
1013
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001014 def test_map_chunksize(self):
1015 try:
1016 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1017 except multiprocessing.TimeoutError:
1018 self.fail("pool.map_async with chunksize stalled on null list")
1019
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020 def test_async(self):
1021 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1022 get = TimingWrapper(res.get)
1023 self.assertEqual(get(), 49)
1024 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1025
1026 def test_async_timeout(self):
1027 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1028 get = TimingWrapper(res.get)
1029 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1030 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1031
1032 def test_imap(self):
1033 it = self.pool.imap(sqr, list(range(10)))
1034 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1035
1036 it = self.pool.imap(sqr, list(range(10)))
1037 for i in range(10):
1038 self.assertEqual(next(it), i*i)
1039 self.assertRaises(StopIteration, it.__next__)
1040
1041 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1042 for i in range(1000):
1043 self.assertEqual(next(it), i*i)
1044 self.assertRaises(StopIteration, it.__next__)
1045
1046 def test_imap_unordered(self):
1047 it = self.pool.imap_unordered(sqr, list(range(1000)))
1048 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1049
1050 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1051 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1052
1053 def test_make_pool(self):
1054 p = multiprocessing.Pool(3)
1055 self.assertEqual(3, len(p._pool))
1056 p.close()
1057 p.join()
1058
1059 def test_terminate(self):
1060 if self.TYPE == 'manager':
1061 # On Unix a forked process increfs each shared object to
1062 # which its parent process held a reference. If the
1063 # forked process gets terminated then there is likely to
1064 # be a reference leak. So to prevent
1065 # _TestZZZNumberOfObjects from failing we skip this test
1066 # when using a manager.
1067 return
1068
1069 result = self.pool.map_async(
1070 time.sleep, [0.1 for i in range(10000)], chunksize=1
1071 )
1072 self.pool.terminate()
1073 join = TimingWrapper(self.pool.join)
1074 join()
1075 self.assertTrue(join.elapsed < 0.2)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001076
1077class _TestPoolWorkerLifetime(BaseTestCase):
1078
1079 ALLOWED_TYPES = ('processes', )
1080 def test_pool_worker_lifetime(self):
1081 p = multiprocessing.Pool(3, maxtasksperchild=10)
1082 self.assertEqual(3, len(p._pool))
1083 origworkerpids = [w.pid for w in p._pool]
1084 # Run many tasks so each worker gets replaced (hopefully)
1085 results = []
1086 for i in range(100):
1087 results.append(p.apply_async(sqr, (i, )))
1088 # Fetch the results and verify we got the right answers,
1089 # also ensuring all the tasks have completed.
1090 for (j, res) in enumerate(results):
1091 self.assertEqual(res.get(), sqr(j))
1092 # Refill the pool
1093 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001094 # Wait until all workers are alive
1095 countdown = 5
1096 while countdown and not all(w.is_alive() for w in p._pool):
1097 countdown -= 1
1098 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001099 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001100 # All pids should be assigned. See issue #7805.
1101 self.assertNotIn(None, origworkerpids)
1102 self.assertNotIn(None, finalworkerpids)
1103 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001104 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1105 p.close()
1106 p.join()
1107
Benjamin Petersone711caf2008-06-11 16:44:04 +00001108#
1109# Test that manager has expected number of shared objects left
1110#
1111
1112class _TestZZZNumberOfObjects(BaseTestCase):
1113 # Because test cases are sorted alphabetically, this one will get
1114 # run after all the other tests for the manager. It tests that
1115 # there have been no "reference leaks" for the manager's shared
1116 # objects. Note the comment in _TestPool.test_terminate().
1117 ALLOWED_TYPES = ('manager',)
1118
1119 def test_number_of_objects(self):
1120 EXPECTED_NUMBER = 1 # the pool object is still alive
1121 multiprocessing.active_children() # discard dead process objs
1122 gc.collect() # do garbage collection
1123 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001124 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001125 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001126 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001127 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001128
1129 self.assertEqual(refs, EXPECTED_NUMBER)
1130
1131#
1132# Test of creating a customized manager class
1133#
1134
1135from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1136
1137class FooBar(object):
1138 def f(self):
1139 return 'f()'
1140 def g(self):
1141 raise ValueError
1142 def _h(self):
1143 return '_h()'
1144
1145def baz():
1146 for i in range(10):
1147 yield i*i
1148
1149class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001150 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001151 def __iter__(self):
1152 return self
1153 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001154 return self._callmethod('__next__')
1155
1156class MyManager(BaseManager):
1157 pass
1158
1159MyManager.register('Foo', callable=FooBar)
1160MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1161MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1162
1163
1164class _TestMyManager(BaseTestCase):
1165
1166 ALLOWED_TYPES = ('manager',)
1167
1168 def test_mymanager(self):
1169 manager = MyManager()
1170 manager.start()
1171
1172 foo = manager.Foo()
1173 bar = manager.Bar()
1174 baz = manager.baz()
1175
1176 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1177 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1178
1179 self.assertEqual(foo_methods, ['f', 'g'])
1180 self.assertEqual(bar_methods, ['f', '_h'])
1181
1182 self.assertEqual(foo.f(), 'f()')
1183 self.assertRaises(ValueError, foo.g)
1184 self.assertEqual(foo._callmethod('f'), 'f()')
1185 self.assertRaises(RemoteError, foo._callmethod, '_h')
1186
1187 self.assertEqual(bar.f(), 'f()')
1188 self.assertEqual(bar._h(), '_h()')
1189 self.assertEqual(bar._callmethod('f'), 'f()')
1190 self.assertEqual(bar._callmethod('_h'), '_h()')
1191
1192 self.assertEqual(list(baz), [i*i for i in range(10)])
1193
1194 manager.shutdown()
1195
1196#
1197# Test of connecting to a remote server and using xmlrpclib for serialization
1198#
1199
1200_queue = pyqueue.Queue()
1201def get_queue():
1202 return _queue
1203
1204class QueueManager(BaseManager):
1205 '''manager class used by server process'''
1206QueueManager.register('get_queue', callable=get_queue)
1207
1208class QueueManager2(BaseManager):
1209 '''manager class which specifies the same interface as QueueManager'''
1210QueueManager2.register('get_queue')
1211
1212
1213SERIALIZER = 'xmlrpclib'
1214
1215class _TestRemoteManager(BaseTestCase):
1216
1217 ALLOWED_TYPES = ('manager',)
1218
1219 def _putter(self, address, authkey):
1220 manager = QueueManager2(
1221 address=address, authkey=authkey, serializer=SERIALIZER
1222 )
1223 manager.connect()
1224 queue = manager.get_queue()
1225 queue.put(('hello world', None, True, 2.25))
1226
1227 def test_remote(self):
1228 authkey = os.urandom(32)
1229
1230 manager = QueueManager(
1231 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1232 )
1233 manager.start()
1234
1235 p = self.Process(target=self._putter, args=(manager.address, authkey))
1236 p.start()
1237
1238 manager2 = QueueManager2(
1239 address=manager.address, authkey=authkey, serializer=SERIALIZER
1240 )
1241 manager2.connect()
1242 queue = manager2.get_queue()
1243
1244 # Note that xmlrpclib will deserialize object as a list not a tuple
1245 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1246
1247 # Because we are using xmlrpclib for serialization instead of
1248 # pickle this will cause a serialization error.
1249 self.assertRaises(Exception, queue.put, time.sleep)
1250
1251 # Make queue finalizer run before the server is stopped
1252 del queue
1253 manager.shutdown()
1254
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001255class _TestManagerRestart(BaseTestCase):
1256
1257 def _putter(self, address, authkey):
1258 manager = QueueManager(
1259 address=address, authkey=authkey, serializer=SERIALIZER)
1260 manager.connect()
1261 queue = manager.get_queue()
1262 queue.put('hello world')
1263
1264 def test_rapid_restart(self):
1265 authkey = os.urandom(32)
1266 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001267 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1268 addr = manager.get_server().address
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001269 manager.start()
1270
1271 p = self.Process(target=self._putter, args=(manager.address, authkey))
1272 p.start()
1273 queue = manager.get_queue()
1274 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001275 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001276 manager.shutdown()
1277 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001278 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001279 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001280 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001281
Benjamin Petersone711caf2008-06-11 16:44:04 +00001282#
1283#
1284#
1285
1286SENTINEL = latin('')
1287
1288class _TestConnection(BaseTestCase):
1289
1290 ALLOWED_TYPES = ('processes', 'threads')
1291
1292 def _echo(self, conn):
1293 for msg in iter(conn.recv_bytes, SENTINEL):
1294 conn.send_bytes(msg)
1295 conn.close()
1296
1297 def test_connection(self):
1298 conn, child_conn = self.Pipe()
1299
1300 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001301 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001302 p.start()
1303
1304 seq = [1, 2.25, None]
1305 msg = latin('hello world')
1306 longmsg = msg * 10
1307 arr = array.array('i', list(range(4)))
1308
1309 if self.TYPE == 'processes':
1310 self.assertEqual(type(conn.fileno()), int)
1311
1312 self.assertEqual(conn.send(seq), None)
1313 self.assertEqual(conn.recv(), seq)
1314
1315 self.assertEqual(conn.send_bytes(msg), None)
1316 self.assertEqual(conn.recv_bytes(), msg)
1317
1318 if self.TYPE == 'processes':
1319 buffer = array.array('i', [0]*10)
1320 expected = list(arr) + [0] * (10 - len(arr))
1321 self.assertEqual(conn.send_bytes(arr), None)
1322 self.assertEqual(conn.recv_bytes_into(buffer),
1323 len(arr) * buffer.itemsize)
1324 self.assertEqual(list(buffer), expected)
1325
1326 buffer = array.array('i', [0]*10)
1327 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1328 self.assertEqual(conn.send_bytes(arr), None)
1329 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1330 len(arr) * buffer.itemsize)
1331 self.assertEqual(list(buffer), expected)
1332
1333 buffer = bytearray(latin(' ' * 40))
1334 self.assertEqual(conn.send_bytes(longmsg), None)
1335 try:
1336 res = conn.recv_bytes_into(buffer)
1337 except multiprocessing.BufferTooShort as e:
1338 self.assertEqual(e.args, (longmsg,))
1339 else:
1340 self.fail('expected BufferTooShort, got %s' % res)
1341
1342 poll = TimingWrapper(conn.poll)
1343
1344 self.assertEqual(poll(), False)
1345 self.assertTimingAlmostEqual(poll.elapsed, 0)
1346
1347 self.assertEqual(poll(TIMEOUT1), False)
1348 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1349
1350 conn.send(None)
1351
1352 self.assertEqual(poll(TIMEOUT1), True)
1353 self.assertTimingAlmostEqual(poll.elapsed, 0)
1354
1355 self.assertEqual(conn.recv(), None)
1356
1357 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1358 conn.send_bytes(really_big_msg)
1359 self.assertEqual(conn.recv_bytes(), really_big_msg)
1360
1361 conn.send_bytes(SENTINEL) # tell child to quit
1362 child_conn.close()
1363
1364 if self.TYPE == 'processes':
1365 self.assertEqual(conn.readable, True)
1366 self.assertEqual(conn.writable, True)
1367 self.assertRaises(EOFError, conn.recv)
1368 self.assertRaises(EOFError, conn.recv_bytes)
1369
1370 p.join()
1371
1372 def test_duplex_false(self):
1373 reader, writer = self.Pipe(duplex=False)
1374 self.assertEqual(writer.send(1), None)
1375 self.assertEqual(reader.recv(), 1)
1376 if self.TYPE == 'processes':
1377 self.assertEqual(reader.readable, True)
1378 self.assertEqual(reader.writable, False)
1379 self.assertEqual(writer.readable, False)
1380 self.assertEqual(writer.writable, True)
1381 self.assertRaises(IOError, reader.send, 2)
1382 self.assertRaises(IOError, writer.recv)
1383 self.assertRaises(IOError, writer.poll)
1384
1385 def test_spawn_close(self):
1386 # We test that a pipe connection can be closed by parent
1387 # process immediately after child is spawned. On Windows this
1388 # would have sometimes failed on old versions because
1389 # child_conn would be closed before the child got a chance to
1390 # duplicate it.
1391 conn, child_conn = self.Pipe()
1392
1393 p = self.Process(target=self._echo, args=(child_conn,))
1394 p.start()
1395 child_conn.close() # this might complete before child initializes
1396
1397 msg = latin('hello')
1398 conn.send_bytes(msg)
1399 self.assertEqual(conn.recv_bytes(), msg)
1400
1401 conn.send_bytes(SENTINEL)
1402 conn.close()
1403 p.join()
1404
1405 def test_sendbytes(self):
1406 if self.TYPE != 'processes':
1407 return
1408
1409 msg = latin('abcdefghijklmnopqrstuvwxyz')
1410 a, b = self.Pipe()
1411
1412 a.send_bytes(msg)
1413 self.assertEqual(b.recv_bytes(), msg)
1414
1415 a.send_bytes(msg, 5)
1416 self.assertEqual(b.recv_bytes(), msg[5:])
1417
1418 a.send_bytes(msg, 7, 8)
1419 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1420
1421 a.send_bytes(msg, 26)
1422 self.assertEqual(b.recv_bytes(), latin(''))
1423
1424 a.send_bytes(msg, 26, 0)
1425 self.assertEqual(b.recv_bytes(), latin(''))
1426
1427 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1428
1429 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1430
1431 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1432
1433 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1434
1435 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1436
Benjamin Petersone711caf2008-06-11 16:44:04 +00001437class _TestListenerClient(BaseTestCase):
1438
1439 ALLOWED_TYPES = ('processes', 'threads')
1440
1441 def _test(self, address):
1442 conn = self.connection.Client(address)
1443 conn.send('hello')
1444 conn.close()
1445
1446 def test_listener_client(self):
1447 for family in self.connection.families:
1448 l = self.connection.Listener(family=family)
1449 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001450 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001451 p.start()
1452 conn = l.accept()
1453 self.assertEqual(conn.recv(), 'hello')
1454 p.join()
1455 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001456#
1457# Test of sending connection and socket objects between processes
1458#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001459"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001460class _TestPicklingConnections(BaseTestCase):
1461
1462 ALLOWED_TYPES = ('processes',)
1463
1464 def _listener(self, conn, families):
1465 for fam in families:
1466 l = self.connection.Listener(family=fam)
1467 conn.send(l.address)
1468 new_conn = l.accept()
1469 conn.send(new_conn)
1470
1471 if self.TYPE == 'processes':
1472 l = socket.socket()
1473 l.bind(('localhost', 0))
1474 conn.send(l.getsockname())
1475 l.listen(1)
1476 new_conn, addr = l.accept()
1477 conn.send(new_conn)
1478
1479 conn.recv()
1480
1481 def _remote(self, conn):
1482 for (address, msg) in iter(conn.recv, None):
1483 client = self.connection.Client(address)
1484 client.send(msg.upper())
1485 client.close()
1486
1487 if self.TYPE == 'processes':
1488 address, msg = conn.recv()
1489 client = socket.socket()
1490 client.connect(address)
1491 client.sendall(msg.upper())
1492 client.close()
1493
1494 conn.close()
1495
1496 def test_pickling(self):
1497 try:
1498 multiprocessing.allow_connection_pickling()
1499 except ImportError:
1500 return
1501
1502 families = self.connection.families
1503
1504 lconn, lconn0 = self.Pipe()
1505 lp = self.Process(target=self._listener, args=(lconn0, families))
1506 lp.start()
1507 lconn0.close()
1508
1509 rconn, rconn0 = self.Pipe()
1510 rp = self.Process(target=self._remote, args=(rconn0,))
1511 rp.start()
1512 rconn0.close()
1513
1514 for fam in families:
1515 msg = ('This connection uses family %s' % fam).encode('ascii')
1516 address = lconn.recv()
1517 rconn.send((address, msg))
1518 new_conn = lconn.recv()
1519 self.assertEqual(new_conn.recv(), msg.upper())
1520
1521 rconn.send(None)
1522
1523 if self.TYPE == 'processes':
1524 msg = latin('This connection uses a normal socket')
1525 address = lconn.recv()
1526 rconn.send((address, msg))
1527 if hasattr(socket, 'fromfd'):
1528 new_conn = lconn.recv()
1529 self.assertEqual(new_conn.recv(100), msg.upper())
1530 else:
1531 # XXX On Windows with Py2.6 need to backport fromfd()
1532 discard = lconn.recv_bytes()
1533
1534 lconn.send(None)
1535
1536 rconn.close()
1537 lconn.close()
1538
1539 lp.join()
1540 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001541"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001542#
1543#
1544#
1545
1546class _TestHeap(BaseTestCase):
1547
1548 ALLOWED_TYPES = ('processes',)
1549
1550 def test_heap(self):
1551 iterations = 5000
1552 maxblocks = 50
1553 blocks = []
1554
1555 # create and destroy lots of blocks of different sizes
1556 for i in range(iterations):
1557 size = int(random.lognormvariate(0, 1) * 1000)
1558 b = multiprocessing.heap.BufferWrapper(size)
1559 blocks.append(b)
1560 if len(blocks) > maxblocks:
1561 i = random.randrange(maxblocks)
1562 del blocks[i]
1563
1564 # get the heap object
1565 heap = multiprocessing.heap.BufferWrapper._heap
1566
1567 # verify the state of the heap
1568 all = []
1569 occupied = 0
1570 for L in list(heap._len_to_seq.values()):
1571 for arena, start, stop in L:
1572 all.append((heap._arenas.index(arena), start, stop,
1573 stop-start, 'free'))
1574 for arena, start, stop in heap._allocated_blocks:
1575 all.append((heap._arenas.index(arena), start, stop,
1576 stop-start, 'occupied'))
1577 occupied += (stop-start)
1578
1579 all.sort()
1580
1581 for i in range(len(all)-1):
1582 (arena, start, stop) = all[i][:3]
1583 (narena, nstart, nstop) = all[i+1][:3]
1584 self.assertTrue((arena != narena and nstart == 0) or
1585 (stop == nstart))
1586
1587#
1588#
1589#
1590
Benjamin Petersone711caf2008-06-11 16:44:04 +00001591class _Foo(Structure):
1592 _fields_ = [
1593 ('x', c_int),
1594 ('y', c_double)
1595 ]
1596
1597class _TestSharedCTypes(BaseTestCase):
1598
1599 ALLOWED_TYPES = ('processes',)
1600
1601 def _double(self, x, y, foo, arr, string):
1602 x.value *= 2
1603 y.value *= 2
1604 foo.x *= 2
1605 foo.y *= 2
1606 string.value *= 2
1607 for i in range(len(arr)):
1608 arr[i] *= 2
1609
Florent Xiclunaaa171062010-08-14 15:56:42 +00001610 @unittest.skipIf(Value is None, "requires ctypes.Value")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001611 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001612 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001613 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001614 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001615 arr = self.Array('d', list(range(10)), lock=lock)
1616 string = self.Array('c', 20, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001617 string.value = 'hello'
1618
1619 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1620 p.start()
1621 p.join()
1622
1623 self.assertEqual(x.value, 14)
1624 self.assertAlmostEqual(y.value, 2.0/3.0)
1625 self.assertEqual(foo.x, 6)
1626 self.assertAlmostEqual(foo.y, 4.0)
1627 for i in range(10):
1628 self.assertAlmostEqual(arr[i], i*2)
1629 self.assertEqual(string.value, latin('hellohello'))
1630
Florent Xiclunaaa171062010-08-14 15:56:42 +00001631 @unittest.skipIf(Value is None, "requires ctypes.Value")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001632 def test_synchronize(self):
1633 self.test_sharedctypes(lock=True)
1634
Florent Xiclunaaa171062010-08-14 15:56:42 +00001635 @unittest.skipIf(ctypes_copy is None, "requires ctypes.copy")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001636 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001637 foo = _Foo(2, 5.0)
Florent Xiclunaaa171062010-08-14 15:56:42 +00001638 bar = ctypes_copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001639 foo.x = 0
1640 foo.y = 0
1641 self.assertEqual(bar.x, 2)
1642 self.assertAlmostEqual(bar.y, 5.0)
1643
1644#
1645#
1646#
1647
1648class _TestFinalize(BaseTestCase):
1649
1650 ALLOWED_TYPES = ('processes',)
1651
1652 def _test_finalize(self, conn):
1653 class Foo(object):
1654 pass
1655
1656 a = Foo()
1657 util.Finalize(a, conn.send, args=('a',))
1658 del a # triggers callback for a
1659
1660 b = Foo()
1661 close_b = util.Finalize(b, conn.send, args=('b',))
1662 close_b() # triggers callback for b
1663 close_b() # does nothing because callback has already been called
1664 del b # does nothing because callback has already been called
1665
1666 c = Foo()
1667 util.Finalize(c, conn.send, args=('c',))
1668
1669 d10 = Foo()
1670 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1671
1672 d01 = Foo()
1673 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1674 d02 = Foo()
1675 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1676 d03 = Foo()
1677 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1678
1679 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1680
1681 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1682
1683 # call mutliprocessing's cleanup function then exit process without
1684 # garbage collecting locals
1685 util._exit_function()
1686 conn.close()
1687 os._exit(0)
1688
1689 def test_finalize(self):
1690 conn, child_conn = self.Pipe()
1691
1692 p = self.Process(target=self._test_finalize, args=(child_conn,))
1693 p.start()
1694 p.join()
1695
1696 result = [obj for obj in iter(conn.recv, 'STOP')]
1697 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1698
1699#
1700# Test that from ... import * works for each module
1701#
1702
1703class _TestImportStar(BaseTestCase):
1704
1705 ALLOWED_TYPES = ('processes',)
1706
1707 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001708 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001709 'multiprocessing', 'multiprocessing.connection',
1710 'multiprocessing.heap', 'multiprocessing.managers',
1711 'multiprocessing.pool', 'multiprocessing.process',
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001712 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001713 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001714 ]
1715
1716 if c_int is not None:
1717 # This module requires _ctypes
1718 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001719
1720 for name in modules:
1721 __import__(name)
1722 mod = sys.modules[name]
1723
1724 for attr in getattr(mod, '__all__', ()):
1725 self.assertTrue(
1726 hasattr(mod, attr),
1727 '%r does not have attribute %r' % (mod, attr)
1728 )
1729
1730#
1731# Quick test that logging works -- does not test logging output
1732#
1733
1734class _TestLogging(BaseTestCase):
1735
1736 ALLOWED_TYPES = ('processes',)
1737
1738 def test_enable_logging(self):
1739 logger = multiprocessing.get_logger()
1740 logger.setLevel(util.SUBWARNING)
1741 self.assertTrue(logger is not None)
1742 logger.debug('this will not be printed')
1743 logger.info('nor will this')
1744 logger.setLevel(LOG_LEVEL)
1745
1746 def _test_level(self, conn):
1747 logger = multiprocessing.get_logger()
1748 conn.send(logger.getEffectiveLevel())
1749
1750 def test_level(self):
1751 LEVEL1 = 32
1752 LEVEL2 = 37
1753
1754 logger = multiprocessing.get_logger()
1755 root_logger = logging.getLogger()
1756 root_level = root_logger.level
1757
1758 reader, writer = multiprocessing.Pipe(duplex=False)
1759
1760 logger.setLevel(LEVEL1)
1761 self.Process(target=self._test_level, args=(writer,)).start()
1762 self.assertEqual(LEVEL1, reader.recv())
1763
1764 logger.setLevel(logging.NOTSET)
1765 root_logger.setLevel(LEVEL2)
1766 self.Process(target=self._test_level, args=(writer,)).start()
1767 self.assertEqual(LEVEL2, reader.recv())
1768
1769 root_logger.setLevel(root_level)
1770 logger.setLevel(level=LOG_LEVEL)
1771
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001772
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00001773# class _TestLoggingProcessName(BaseTestCase):
1774#
1775# def handle(self, record):
1776# assert record.processName == multiprocessing.current_process().name
1777# self.__handled = True
1778#
1779# def test_logging(self):
1780# handler = logging.Handler()
1781# handler.handle = self.handle
1782# self.__handled = False
1783# # Bypass getLogger() and side-effects
1784# logger = logging.getLoggerClass()(
1785# 'multiprocessing.test.TestLoggingProcessName')
1786# logger.addHandler(handler)
1787# logger.propagate = False
1788#
1789# logger.warn('foo')
1790# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00001791
Benjamin Petersone711caf2008-06-11 16:44:04 +00001792#
Jesse Noller6214edd2009-01-19 16:23:53 +00001793# Test to verify handle verification, see issue 3321
1794#
1795
1796class TestInvalidHandle(unittest.TestCase):
1797
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001798 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001799 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001800 conn = _multiprocessing.Connection(44977608)
1801 self.assertRaises(IOError, conn.poll)
1802 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001803
Jesse Noller6214edd2009-01-19 16:23:53 +00001804#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001805# Functions used to create test cases from the base ones in this module
1806#
1807
1808def get_attributes(Source, names):
1809 d = {}
1810 for name in names:
1811 obj = getattr(Source, name)
1812 if type(obj) == type(get_attributes):
1813 obj = staticmethod(obj)
1814 d[name] = obj
1815 return d
1816
1817def create_test_cases(Mixin, type):
1818 result = {}
1819 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001820 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001821
1822 for name in list(glob.keys()):
1823 if name.startswith('_Test'):
1824 base = glob[name]
1825 if type in base.ALLOWED_TYPES:
1826 newname = 'With' + Type + name[1:]
1827 class Temp(base, unittest.TestCase, Mixin):
1828 pass
1829 result[newname] = Temp
1830 Temp.__name__ = newname
1831 Temp.__module__ = Mixin.__module__
1832 return result
1833
1834#
1835# Create test cases
1836#
1837
1838class ProcessesMixin(object):
1839 TYPE = 'processes'
1840 Process = multiprocessing.Process
1841 locals().update(get_attributes(multiprocessing, (
1842 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1843 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1844 'RawArray', 'current_process', 'active_children', 'Pipe',
1845 'connection', 'JoinableQueue'
1846 )))
1847
1848testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1849globals().update(testcases_processes)
1850
1851
1852class ManagerMixin(object):
1853 TYPE = 'manager'
1854 Process = multiprocessing.Process
1855 manager = object.__new__(multiprocessing.managers.SyncManager)
1856 locals().update(get_attributes(manager, (
1857 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1858 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1859 'Namespace', 'JoinableQueue'
1860 )))
1861
1862testcases_manager = create_test_cases(ManagerMixin, type='manager')
1863globals().update(testcases_manager)
1864
1865
1866class ThreadsMixin(object):
1867 TYPE = 'threads'
1868 Process = multiprocessing.dummy.Process
1869 locals().update(get_attributes(multiprocessing.dummy, (
1870 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1871 'Condition', 'Event', 'Value', 'Array', 'current_process',
1872 'active_children', 'Pipe', 'connection', 'dict', 'list',
1873 'Namespace', 'JoinableQueue'
1874 )))
1875
1876testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1877globals().update(testcases_threads)
1878
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001879class OtherTest(unittest.TestCase):
1880 # TODO: add more tests for deliver/answer challenge.
1881 def test_deliver_challenge_auth_failure(self):
1882 class _FakeConnection(object):
1883 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001884 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001885 def send_bytes(self, data):
1886 pass
1887 self.assertRaises(multiprocessing.AuthenticationError,
1888 multiprocessing.connection.deliver_challenge,
1889 _FakeConnection(), b'abc')
1890
1891 def test_answer_challenge_auth_failure(self):
1892 class _FakeConnection(object):
1893 def __init__(self):
1894 self.count = 0
1895 def recv_bytes(self, size):
1896 self.count += 1
1897 if self.count == 1:
1898 return multiprocessing.connection.CHALLENGE
1899 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001900 return b'something bogus'
1901 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001902 def send_bytes(self, data):
1903 pass
1904 self.assertRaises(multiprocessing.AuthenticationError,
1905 multiprocessing.connection.answer_challenge,
1906 _FakeConnection(), b'abc')
1907
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001908#
1909# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1910#
1911
1912def initializer(ns):
1913 ns.test += 1
1914
1915class TestInitializers(unittest.TestCase):
1916 def setUp(self):
1917 self.mgr = multiprocessing.Manager()
1918 self.ns = self.mgr.Namespace()
1919 self.ns.test = 0
1920
1921 def tearDown(self):
1922 self.mgr.shutdown()
1923
1924 def test_manager_initializer(self):
1925 m = multiprocessing.managers.SyncManager()
1926 self.assertRaises(TypeError, m.start, 1)
1927 m.start(initializer, (self.ns,))
1928 self.assertEqual(self.ns.test, 1)
1929 m.shutdown()
1930
1931 def test_pool_initializer(self):
1932 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1933 p = multiprocessing.Pool(1, initializer, (self.ns,))
1934 p.close()
1935 p.join()
1936 self.assertEqual(self.ns.test, 1)
1937
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00001938#
1939# Issue 5155, 5313, 5331: Test process in processes
1940# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1941#
1942
1943def _ThisSubProcess(q):
1944 try:
1945 item = q.get(block=False)
1946 except pyqueue.Empty:
1947 pass
1948
1949def _TestProcess(q):
1950 queue = multiprocessing.Queue()
1951 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1952 subProc.start()
1953 subProc.join()
1954
1955def _afunc(x):
1956 return x*x
1957
1958def pool_in_process():
1959 pool = multiprocessing.Pool(processes=4)
1960 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1961
1962class _file_like(object):
1963 def __init__(self, delegate):
1964 self._delegate = delegate
1965 self._pid = None
1966
1967 @property
1968 def cache(self):
1969 pid = os.getpid()
1970 # There are no race conditions since fork keeps only the running thread
1971 if pid != self._pid:
1972 self._pid = pid
1973 self._cache = []
1974 return self._cache
1975
1976 def write(self, data):
1977 self.cache.append(data)
1978
1979 def flush(self):
1980 self._delegate.write(''.join(self.cache))
1981 self._cache = []
1982
1983class TestStdinBadfiledescriptor(unittest.TestCase):
1984
1985 def test_queue_in_process(self):
1986 queue = multiprocessing.Queue()
1987 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1988 proc.start()
1989 proc.join()
1990
1991 def test_pool_in_process(self):
1992 p = multiprocessing.Process(target=pool_in_process)
1993 p.start()
1994 p.join()
1995
1996 def test_flushing(self):
1997 sio = io.StringIO()
1998 flike = _file_like(sio)
1999 flike.write('foo')
2000 proc = multiprocessing.Process(target=lambda: flike.flush())
2001 flike.flush()
2002 assert sio.getvalue() == 'foo'
2003
2004testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2005 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002006
Benjamin Petersone711caf2008-06-11 16:44:04 +00002007#
2008#
2009#
2010
2011def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002012 if sys.platform.startswith("linux"):
2013 try:
2014 lock = multiprocessing.RLock()
2015 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002016 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002017
Benjamin Petersone711caf2008-06-11 16:44:04 +00002018 if run is None:
2019 from test.support import run_unittest as run
2020
2021 util.get_temp_dir() # creates temp directory for use by all processes
2022
2023 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2024
Benjamin Peterson41181742008-07-02 20:22:54 +00002025 ProcessesMixin.pool = multiprocessing.Pool(4)
2026 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2027 ManagerMixin.manager.__init__()
2028 ManagerMixin.manager.start()
2029 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002030
2031 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002032 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2033 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002034 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2035 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002036 )
2037
2038 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2039 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2040 run(suite)
2041
Benjamin Peterson41181742008-07-02 20:22:54 +00002042 ThreadsMixin.pool.terminate()
2043 ProcessesMixin.pool.terminate()
2044 ManagerMixin.pool.terminate()
2045 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002046
Benjamin Peterson41181742008-07-02 20:22:54 +00002047 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002048
2049def main():
2050 test_main(unittest.TextTestRunner(verbosity=2).run)
2051
2052if __name__ == '__main__':
2053 main()