blob: 3e467d5ea0b500a3d7928f7b6532d6c81e3bcb04 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000028
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000034
35from multiprocessing import util
36
Brian Curtin918616c2010-10-07 02:12:17 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersone711caf2008-06-11 16:44:04 +000043#
44#
45#
46
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000047def latin(s):
48 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Benjamin Petersone711caf2008-06-11 16:44:04 +000050#
51# Constants
52#
53
54LOG_LEVEL = util.SUBWARNING
55#LOG_LEVEL = logging.WARNING
56
57DELTA = 0.1
58CHECK_TIMINGS = False # making true makes tests take a lot longer
59 # and can sometimes cause some non-serious
60 # failures because some calls block a bit
61 # longer than expected
62if CHECK_TIMINGS:
63 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
64else:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
66
67HAVE_GETVALUE = not getattr(_multiprocessing,
68 'HAVE_BROKEN_SEM_GETVALUE', False)
69
Jesse Noller6214edd2009-01-19 16:23:53 +000070WIN32 = (sys.platform == "win32")
71
Benjamin Petersone711caf2008-06-11 16:44:04 +000072#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000073# Some tests require ctypes
74#
75
76try:
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000077 from ctypes import Structure, c_int, c_double
Florent Xicluna9b0e9182010-03-28 11:42:38 +000078except ImportError:
79 Structure = object
80 c_int = c_double = None
81
82#
Benjamin Petersone711caf2008-06-11 16:44:04 +000083# Creates a wrapper for a function which records the time it takes to finish
84#
85
86class TimingWrapper(object):
87
88 def __init__(self, func):
89 self.func = func
90 self.elapsed = None
91
92 def __call__(self, *args, **kwds):
93 t = time.time()
94 try:
95 return self.func(*args, **kwds)
96 finally:
97 self.elapsed = time.time() - t
98
99#
100# Base class for test cases
101#
102
103class BaseTestCase(object):
104
105 ALLOWED_TYPES = ('processes', 'manager', 'threads')
106
107 def assertTimingAlmostEqual(self, a, b):
108 if CHECK_TIMINGS:
109 self.assertAlmostEqual(a, b, 1)
110
111 def assertReturnsIfImplemented(self, value, func, *args):
112 try:
113 res = func(*args)
114 except NotImplementedError:
115 pass
116 else:
117 return self.assertEqual(value, res)
118
Antoine Pitrou26899f42010-11-02 23:52:49 +0000119 # For the sanity of Windows users, rather than crashing or freezing in
120 # multiple ways.
121 def __reduce__(self, *args):
122 raise NotImplementedError("shouldn't try to pickle a test case")
123
124 __reduce_ex__ = __reduce__
125
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126#
127# Return the value of a semaphore
128#
129
130def get_value(self):
131 try:
132 return self.get_value()
133 except AttributeError:
134 try:
135 return self._Semaphore__value
136 except AttributeError:
137 try:
138 return self._value
139 except AttributeError:
140 raise NotImplementedError
141
142#
143# Testcases
144#
145
146class _TestProcess(BaseTestCase):
147
148 ALLOWED_TYPES = ('processes', 'threads')
149
150 def test_current(self):
151 if self.TYPE == 'threads':
152 return
153
154 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000155 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156
157 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 self.assertTrue(isinstance(authkey, bytes))
160 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertEqual(current.ident, os.getpid())
162 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163
Antoine Pitrou26899f42010-11-02 23:52:49 +0000164 @classmethod
165 def _test(cls, q, *args, **kwds):
166 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000167 q.put(args)
168 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000169 q.put(current.name)
Antoine Pitrou26899f42010-11-02 23:52:49 +0000170 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 q.put(current.pid)
173
174 def test_process(self):
175 q = self.Queue(1)
176 e = self.Event()
177 args = (q, 1, 2)
178 kwargs = {'hello':23, 'bye':2.54}
179 name = 'SomeProcess'
180 p = self.Process(
181 target=self._test, args=args, kwargs=kwargs, name=name
182 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000183 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000184 current = self.current_process()
185
186 if self.TYPE != 'threads':
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000187 self.assertEqual(p.authkey, current.authkey)
188 self.assertEqual(p.is_alive(), False)
189 self.assertEqual(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190 self.assertTrue(p not in self.active_children())
191 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000192 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
194 p.start()
195
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000196 self.assertEqual(p.exitcode, None)
197 self.assertEqual(p.is_alive(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198 self.assertTrue(p in self.active_children())
199
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000200 self.assertEqual(q.get(), args[1:])
201 self.assertEqual(q.get(), kwargs)
202 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 if self.TYPE != 'threads':
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000204 self.assertEqual(q.get(), current.authkey)
205 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
207 p.join()
208
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000209 self.assertEqual(p.exitcode, 0)
210 self.assertEqual(p.is_alive(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000211 self.assertTrue(p not in self.active_children())
212
Antoine Pitrou26899f42010-11-02 23:52:49 +0000213 @classmethod
214 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215 time.sleep(1000)
216
217 def test_terminate(self):
218 if self.TYPE == 'threads':
219 return
220
221 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000222 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 p.start()
224
225 self.assertEqual(p.is_alive(), True)
226 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 p.terminate()
230
231 join = TimingWrapper(p.join)
232 self.assertEqual(join(), None)
233 self.assertTimingAlmostEqual(join.elapsed, 0.0)
234
235 self.assertEqual(p.is_alive(), False)
236 self.assertTrue(p not in self.active_children())
237
238 p.join()
239
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 # XXX sometimes get p.exitcode == 0 on Windows ...
241 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242
243 def test_cpu_count(self):
244 try:
245 cpus = multiprocessing.cpu_count()
246 except NotImplementedError:
247 cpus = 1
248 self.assertTrue(type(cpus) is int)
249 self.assertTrue(cpus >= 1)
250
251 def test_active_children(self):
252 self.assertEqual(type(self.active_children()), list)
253
254 p = self.Process(target=time.sleep, args=(DELTA,))
255 self.assertTrue(p not in self.active_children())
256
257 p.start()
258 self.assertTrue(p in self.active_children())
259
260 p.join()
261 self.assertTrue(p not in self.active_children())
262
Antoine Pitrou26899f42010-11-02 23:52:49 +0000263 @classmethod
264 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265 from multiprocessing import forking
266 wconn.send(id)
267 if len(id) < 2:
268 for i in range(2):
Antoine Pitrou26899f42010-11-02 23:52:49 +0000269 p = cls.Process(
270 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 )
272 p.start()
273 p.join()
274
275 def test_recursion(self):
276 rconn, wconn = self.Pipe(duplex=False)
277 self._test_recursion(wconn, [])
278
279 time.sleep(DELTA)
280 result = []
281 while rconn.poll():
282 result.append(rconn.recv())
283
284 expected = [
285 [],
286 [0],
287 [0, 0],
288 [0, 1],
289 [1],
290 [1, 0],
291 [1, 1]
292 ]
293 self.assertEqual(result, expected)
294
295#
296#
297#
298
299class _UpperCaser(multiprocessing.Process):
300
301 def __init__(self):
302 multiprocessing.Process.__init__(self)
303 self.child_conn, self.parent_conn = multiprocessing.Pipe()
304
305 def run(self):
306 self.parent_conn.close()
307 for s in iter(self.child_conn.recv, None):
308 self.child_conn.send(s.upper())
309 self.child_conn.close()
310
311 def submit(self, s):
312 assert type(s) is str
313 self.parent_conn.send(s)
314 return self.parent_conn.recv()
315
316 def stop(self):
317 self.parent_conn.send(None)
318 self.parent_conn.close()
319 self.child_conn.close()
320
321class _TestSubclassingProcess(BaseTestCase):
322
323 ALLOWED_TYPES = ('processes',)
324
325 def test_subclassing(self):
326 uppercaser = _UpperCaser()
327 uppercaser.start()
328 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
329 self.assertEqual(uppercaser.submit('world'), 'WORLD')
330 uppercaser.stop()
331 uppercaser.join()
332
333#
334#
335#
336
337def queue_empty(q):
338 if hasattr(q, 'empty'):
339 return q.empty()
340 else:
341 return q.qsize() == 0
342
343def queue_full(q, maxsize):
344 if hasattr(q, 'full'):
345 return q.full()
346 else:
347 return q.qsize() == maxsize
348
349
350class _TestQueue(BaseTestCase):
351
352
Antoine Pitrou26899f42010-11-02 23:52:49 +0000353 @classmethod
354 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000355 child_can_start.wait()
356 for i in range(6):
357 queue.get()
358 parent_can_continue.set()
359
360 def test_put(self):
361 MAXSIZE = 6
362 queue = self.Queue(maxsize=MAXSIZE)
363 child_can_start = self.Event()
364 parent_can_continue = self.Event()
365
366 proc = self.Process(
367 target=self._test_put,
368 args=(queue, child_can_start, parent_can_continue)
369 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000370 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000371 proc.start()
372
373 self.assertEqual(queue_empty(queue), True)
374 self.assertEqual(queue_full(queue, MAXSIZE), False)
375
376 queue.put(1)
377 queue.put(2, True)
378 queue.put(3, True, None)
379 queue.put(4, False)
380 queue.put(5, False, None)
381 queue.put_nowait(6)
382
383 # the values may be in buffer but not yet in pipe so sleep a bit
384 time.sleep(DELTA)
385
386 self.assertEqual(queue_empty(queue), False)
387 self.assertEqual(queue_full(queue, MAXSIZE), True)
388
389 put = TimingWrapper(queue.put)
390 put_nowait = TimingWrapper(queue.put_nowait)
391
392 self.assertRaises(pyqueue.Full, put, 7, False)
393 self.assertTimingAlmostEqual(put.elapsed, 0)
394
395 self.assertRaises(pyqueue.Full, put, 7, False, None)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put_nowait, 7)
399 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
400
401 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
402 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
403
404 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
405 self.assertTimingAlmostEqual(put.elapsed, 0)
406
407 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
408 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
409
410 child_can_start.set()
411 parent_can_continue.wait()
412
413 self.assertEqual(queue_empty(queue), True)
414 self.assertEqual(queue_full(queue, MAXSIZE), False)
415
416 proc.join()
417
Antoine Pitrou26899f42010-11-02 23:52:49 +0000418 @classmethod
419 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000420 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000421 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 queue.put(2)
423 queue.put(3)
424 queue.put(4)
425 queue.put(5)
426 parent_can_continue.set()
427
428 def test_get(self):
429 queue = self.Queue()
430 child_can_start = self.Event()
431 parent_can_continue = self.Event()
432
433 proc = self.Process(
434 target=self._test_get,
435 args=(queue, child_can_start, parent_can_continue)
436 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000437 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000438 proc.start()
439
440 self.assertEqual(queue_empty(queue), True)
441
442 child_can_start.set()
443 parent_can_continue.wait()
444
445 time.sleep(DELTA)
446 self.assertEqual(queue_empty(queue), False)
447
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000448 # Hangs unexpectedly, remove for now
449 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000450 self.assertEqual(queue.get(True, None), 2)
451 self.assertEqual(queue.get(True), 3)
452 self.assertEqual(queue.get(timeout=1), 4)
453 self.assertEqual(queue.get_nowait(), 5)
454
455 self.assertEqual(queue_empty(queue), True)
456
457 get = TimingWrapper(queue.get)
458 get_nowait = TimingWrapper(queue.get_nowait)
459
460 self.assertRaises(pyqueue.Empty, get, False)
461 self.assertTimingAlmostEqual(get.elapsed, 0)
462
463 self.assertRaises(pyqueue.Empty, get, False, None)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(pyqueue.Empty, get_nowait)
467 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
468
469 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
470 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
471
472 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
473 self.assertTimingAlmostEqual(get.elapsed, 0)
474
475 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
476 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
477
478 proc.join()
479
Antoine Pitrou26899f42010-11-02 23:52:49 +0000480 @classmethod
481 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482 for i in range(10, 20):
483 queue.put(i)
484 # note that at this point the items may only be buffered, so the
485 # process cannot shutdown until the feeder thread has finished
486 # pushing items onto the pipe.
487
488 def test_fork(self):
489 # Old versions of Queue would fail to create a new feeder
490 # thread for a forked process if the original process had its
491 # own feeder thread. This test checks that this no longer
492 # happens.
493
494 queue = self.Queue()
495
496 # put items on queue so that main process starts a feeder thread
497 for i in range(10):
498 queue.put(i)
499
500 # wait to make sure thread starts before we fork a new process
501 time.sleep(DELTA)
502
503 # fork process
504 p = self.Process(target=self._test_fork, args=(queue,))
505 p.start()
506
507 # check that all expected items are in the queue
508 for i in range(20):
509 self.assertEqual(queue.get(), i)
510 self.assertRaises(pyqueue.Empty, queue.get, False)
511
512 p.join()
513
514 def test_qsize(self):
515 q = self.Queue()
516 try:
517 self.assertEqual(q.qsize(), 0)
518 except NotImplementedError:
519 return
520 q.put(1)
521 self.assertEqual(q.qsize(), 1)
522 q.put(5)
523 self.assertEqual(q.qsize(), 2)
524 q.get()
525 self.assertEqual(q.qsize(), 1)
526 q.get()
527 self.assertEqual(q.qsize(), 0)
528
Antoine Pitrou26899f42010-11-02 23:52:49 +0000529 @classmethod
530 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000531 for obj in iter(q.get, None):
532 time.sleep(DELTA)
533 q.task_done()
534
535 def test_task_done(self):
536 queue = self.JoinableQueue()
537
538 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000539 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000540
541 workers = [self.Process(target=self._test_task_done, args=(queue,))
542 for i in range(4)]
543
544 for p in workers:
545 p.start()
546
547 for i in range(10):
548 queue.put(i)
549
550 queue.join()
551
552 for p in workers:
553 queue.put(None)
554
555 for p in workers:
556 p.join()
557
558#
559#
560#
561
562class _TestLock(BaseTestCase):
563
564 def test_lock(self):
565 lock = self.Lock()
566 self.assertEqual(lock.acquire(), True)
567 self.assertEqual(lock.acquire(False), False)
568 self.assertEqual(lock.release(), None)
569 self.assertRaises((ValueError, threading.ThreadError), lock.release)
570
571 def test_rlock(self):
572 lock = self.RLock()
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.acquire(), True)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertEqual(lock.release(), None)
579 self.assertRaises((AssertionError, RuntimeError), lock.release)
580
Jesse Nollerf8d00852009-03-31 03:25:07 +0000581 def test_lock_context(self):
582 with self.Lock():
583 pass
584
Benjamin Petersone711caf2008-06-11 16:44:04 +0000585
586class _TestSemaphore(BaseTestCase):
587
588 def _test_semaphore(self, sem):
589 self.assertReturnsIfImplemented(2, get_value, sem)
590 self.assertEqual(sem.acquire(), True)
591 self.assertReturnsIfImplemented(1, get_value, sem)
592 self.assertEqual(sem.acquire(), True)
593 self.assertReturnsIfImplemented(0, get_value, sem)
594 self.assertEqual(sem.acquire(False), False)
595 self.assertReturnsIfImplemented(0, get_value, sem)
596 self.assertEqual(sem.release(), None)
597 self.assertReturnsIfImplemented(1, get_value, sem)
598 self.assertEqual(sem.release(), None)
599 self.assertReturnsIfImplemented(2, get_value, sem)
600
601 def test_semaphore(self):
602 sem = self.Semaphore(2)
603 self._test_semaphore(sem)
604 self.assertEqual(sem.release(), None)
605 self.assertReturnsIfImplemented(3, get_value, sem)
606 self.assertEqual(sem.release(), None)
607 self.assertReturnsIfImplemented(4, get_value, sem)
608
609 def test_bounded_semaphore(self):
610 sem = self.BoundedSemaphore(2)
611 self._test_semaphore(sem)
612 # Currently fails on OS/X
613 #if HAVE_GETVALUE:
614 # self.assertRaises(ValueError, sem.release)
615 # self.assertReturnsIfImplemented(2, get_value, sem)
616
617 def test_timeout(self):
618 if self.TYPE != 'processes':
619 return
620
621 sem = self.Semaphore(0)
622 acquire = TimingWrapper(sem.acquire)
623
624 self.assertEqual(acquire(False), False)
625 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
626
627 self.assertEqual(acquire(False, None), False)
628 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
629
630 self.assertEqual(acquire(False, TIMEOUT1), False)
631 self.assertTimingAlmostEqual(acquire.elapsed, 0)
632
633 self.assertEqual(acquire(True, TIMEOUT2), False)
634 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
635
636 self.assertEqual(acquire(timeout=TIMEOUT3), False)
637 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
638
639
640class _TestCondition(BaseTestCase):
641
Antoine Pitrou26899f42010-11-02 23:52:49 +0000642 @classmethod
643 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000644 cond.acquire()
645 sleeping.release()
646 cond.wait(timeout)
647 woken.release()
648 cond.release()
649
650 def check_invariant(self, cond):
651 # this is only supposed to succeed when there are no sleepers
652 if self.TYPE == 'processes':
653 try:
654 sleepers = (cond._sleeping_count.get_value() -
655 cond._woken_count.get_value())
656 self.assertEqual(sleepers, 0)
657 self.assertEqual(cond._wait_semaphore.get_value(), 0)
658 except NotImplementedError:
659 pass
660
661 def test_notify(self):
662 cond = self.Condition()
663 sleeping = self.Semaphore(0)
664 woken = self.Semaphore(0)
665
666 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000667 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000668 p.start()
669
670 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000671 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000672 p.start()
673
674 # wait for both children to start sleeping
675 sleeping.acquire()
676 sleeping.acquire()
677
678 # check no process/thread has woken up
679 time.sleep(DELTA)
680 self.assertReturnsIfImplemented(0, get_value, woken)
681
682 # wake up one process/thread
683 cond.acquire()
684 cond.notify()
685 cond.release()
686
687 # check one process/thread has woken up
688 time.sleep(DELTA)
689 self.assertReturnsIfImplemented(1, get_value, woken)
690
691 # wake up another
692 cond.acquire()
693 cond.notify()
694 cond.release()
695
696 # check other has woken up
697 time.sleep(DELTA)
698 self.assertReturnsIfImplemented(2, get_value, woken)
699
700 # check state is not mucked up
701 self.check_invariant(cond)
702 p.join()
703
704 def test_notify_all(self):
705 cond = self.Condition()
706 sleeping = self.Semaphore(0)
707 woken = self.Semaphore(0)
708
709 # start some threads/processes which will timeout
710 for i in range(3):
711 p = self.Process(target=self.f,
712 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000713 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000714 p.start()
715
716 t = threading.Thread(target=self.f,
717 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000718 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000719 t.start()
720
721 # wait for them all to sleep
722 for i in range(6):
723 sleeping.acquire()
724
725 # check they have all timed out
726 for i in range(6):
727 woken.acquire()
728 self.assertReturnsIfImplemented(0, get_value, woken)
729
730 # check state is not mucked up
731 self.check_invariant(cond)
732
733 # start some more threads/processes
734 for i in range(3):
735 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000736 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000737 p.start()
738
739 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000740 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000741 t.start()
742
743 # wait for them to all sleep
744 for i in range(6):
745 sleeping.acquire()
746
747 # check no process/thread has woken up
748 time.sleep(DELTA)
749 self.assertReturnsIfImplemented(0, get_value, woken)
750
751 # wake them all up
752 cond.acquire()
753 cond.notify_all()
754 cond.release()
755
756 # check they have all woken
757 time.sleep(DELTA)
758 self.assertReturnsIfImplemented(6, get_value, woken)
759
760 # check state is not mucked up
761 self.check_invariant(cond)
762
763 def test_timeout(self):
764 cond = self.Condition()
765 wait = TimingWrapper(cond.wait)
766 cond.acquire()
767 res = wait(TIMEOUT1)
768 cond.release()
769 self.assertEqual(res, None)
770 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
771
772
773class _TestEvent(BaseTestCase):
774
Antoine Pitrou26899f42010-11-02 23:52:49 +0000775 @classmethod
776 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777 time.sleep(TIMEOUT2)
778 event.set()
779
780 def test_event(self):
781 event = self.Event()
782 wait = TimingWrapper(event.wait)
783
Ezio Melotti13925002011-03-16 11:05:33 +0200784 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000786 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 # Removed, threading.Event.wait() will return the value of the __flag
789 # instead of None. API Shear with the semaphore backed mp.Event
790 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000792 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
794
795 event.set()
796
797 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000798 self.assertEqual(event.is_set(), True)
799 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000801 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
803 # self.assertEqual(event.is_set(), True)
804
805 event.clear()
806
807 #self.assertEqual(event.is_set(), False)
808
809 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000810 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811
812#
813#
814#
815
816class _TestValue(BaseTestCase):
817
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000818 ALLOWED_TYPES = ('processes',)
819
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820 codes_values = [
821 ('i', 4343, 24234),
822 ('d', 3.625, -4.25),
823 ('h', -232, 234),
824 ('c', latin('x'), latin('y'))
825 ]
826
Antoine Pitrou72d5a9d2010-11-22 16:33:23 +0000827 def setUp(self):
828 if not HAS_SHAREDCTYPES:
829 self.skipTest("requires multiprocessing.sharedctypes")
830
Antoine Pitrou26899f42010-11-02 23:52:49 +0000831 @classmethod
832 def _test(cls, values):
833 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 sv.value = cv[2]
835
836
837 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 if raw:
839 values = [self.RawValue(code, value)
840 for code, value, _ in self.codes_values]
841 else:
842 values = [self.Value(code, value)
843 for code, value, _ in self.codes_values]
844
845 for sv, cv in zip(values, self.codes_values):
846 self.assertEqual(sv.value, cv[1])
847
848 proc = self.Process(target=self._test, args=(values,))
849 proc.start()
850 proc.join()
851
852 for sv, cv in zip(values, self.codes_values):
853 self.assertEqual(sv.value, cv[2])
854
855 def test_rawvalue(self):
856 self.test_value(raw=True)
857
858 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 val1 = self.Value('i', 5)
860 lock1 = val1.get_lock()
861 obj1 = val1.get_obj()
862
863 val2 = self.Value('i', 5, lock=None)
864 lock2 = val2.get_lock()
865 obj2 = val2.get_obj()
866
867 lock = self.Lock()
868 val3 = self.Value('i', 5, lock=lock)
869 lock3 = val3.get_lock()
870 obj3 = val3.get_obj()
871 self.assertEqual(lock, lock3)
872
Jesse Nollerb0516a62009-01-18 03:11:38 +0000873 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 self.assertFalse(hasattr(arr4, 'get_lock'))
875 self.assertFalse(hasattr(arr4, 'get_obj'))
876
Jesse Nollerb0516a62009-01-18 03:11:38 +0000877 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
878
879 arr5 = self.RawValue('i', 5)
880 self.assertFalse(hasattr(arr5, 'get_lock'))
881 self.assertFalse(hasattr(arr5, 'get_obj'))
882
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883
884class _TestArray(BaseTestCase):
885
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000886 ALLOWED_TYPES = ('processes',)
887
Antoine Pitrou26899f42010-11-02 23:52:49 +0000888 @classmethod
889 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 for i in range(1, len(seq)):
891 seq[i] += seq[i-1]
892
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000893 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
896 if raw:
897 arr = self.RawArray('i', seq)
898 else:
899 arr = self.Array('i', seq)
900
901 self.assertEqual(len(arr), len(seq))
902 self.assertEqual(arr[3], seq[3])
903 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
904
905 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
906
907 self.assertEqual(list(arr[:]), seq)
908
909 self.f(seq)
910
911 p = self.Process(target=self.f, args=(arr,))
912 p.start()
913 p.join()
914
915 self.assertEqual(list(arr[:]), seq)
916
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000917 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000918 def test_array_from_size(self):
919 size = 10
920 # Test for zeroing (see issue #11675).
921 # The repetition below strengthens the test by increasing the chances
922 # of previously allocated non-zero memory being used for the new array
923 # on the 2nd and 3rd loops.
924 for _ in range(3):
925 arr = self.Array('i', size)
926 self.assertEqual(len(arr), size)
927 self.assertEqual(list(arr), [0] * size)
928 arr[:] = range(10)
929 self.assertEqual(list(arr), list(range(10)))
930 del arr
931
932 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933 def test_rawarray(self):
934 self.test_array(raw=True)
935
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000936 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 arr1 = self.Array('i', list(range(10)))
939 lock1 = arr1.get_lock()
940 obj1 = arr1.get_obj()
941
942 arr2 = self.Array('i', list(range(10)), lock=None)
943 lock2 = arr2.get_lock()
944 obj2 = arr2.get_obj()
945
946 lock = self.Lock()
947 arr3 = self.Array('i', list(range(10)), lock=lock)
948 lock3 = arr3.get_lock()
949 obj3 = arr3.get_obj()
950 self.assertEqual(lock, lock3)
951
Jesse Nollerb0516a62009-01-18 03:11:38 +0000952 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000953 self.assertFalse(hasattr(arr4, 'get_lock'))
954 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000955 self.assertRaises(AttributeError,
956 self.Array, 'i', range(10), lock='notalock')
957
958 arr5 = self.RawArray('i', range(10))
959 self.assertFalse(hasattr(arr5, 'get_lock'))
960 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961
962#
963#
964#
965
966class _TestContainers(BaseTestCase):
967
968 ALLOWED_TYPES = ('manager',)
969
970 def test_list(self):
971 a = self.list(list(range(10)))
972 self.assertEqual(a[:], list(range(10)))
973
974 b = self.list()
975 self.assertEqual(b[:], [])
976
977 b.extend(list(range(5)))
978 self.assertEqual(b[:], list(range(5)))
979
980 self.assertEqual(b[2], 2)
981 self.assertEqual(b[2:10], [2,3,4])
982
983 b *= 2
984 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
985
986 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
987
988 self.assertEqual(a[:], list(range(10)))
989
990 d = [a, b]
991 e = self.list(d)
992 self.assertEqual(
993 e[:],
994 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
995 )
996
997 f = self.list([a])
998 a.append('hello')
999 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1000
1001 def test_dict(self):
1002 d = self.dict()
1003 indices = list(range(65, 70))
1004 for i in indices:
1005 d[i] = chr(i)
1006 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1007 self.assertEqual(sorted(d.keys()), indices)
1008 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1009 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1010
1011 def test_namespace(self):
1012 n = self.Namespace()
1013 n.name = 'Bob'
1014 n.job = 'Builder'
1015 n._hidden = 'hidden'
1016 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1017 del n.job
1018 self.assertEqual(str(n), "Namespace(name='Bob')")
1019 self.assertTrue(hasattr(n, 'name'))
1020 self.assertTrue(not hasattr(n, 'job'))
1021
1022#
1023#
1024#
1025
1026def sqr(x, wait=0.0):
1027 time.sleep(wait)
1028 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +00001029class _TestPool(BaseTestCase):
1030
1031 def test_apply(self):
1032 papply = self.pool.apply
1033 self.assertEqual(papply(sqr, (5,)), sqr(5))
1034 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1035
1036 def test_map(self):
1037 pmap = self.pool.map
1038 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1039 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1040 list(map(sqr, list(range(100)))))
1041
Georg Brandld80344f2009-08-13 12:26:19 +00001042 def test_map_chunksize(self):
1043 try:
1044 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1045 except multiprocessing.TimeoutError:
1046 self.fail("pool.map_async with chunksize stalled on null list")
1047
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 def test_async(self):
1049 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1050 get = TimingWrapper(res.get)
1051 self.assertEqual(get(), 49)
1052 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1053
1054 def test_async_timeout(self):
1055 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1056 get = TimingWrapper(res.get)
1057 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1058 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1059
1060 def test_imap(self):
1061 it = self.pool.imap(sqr, list(range(10)))
1062 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1063
1064 it = self.pool.imap(sqr, list(range(10)))
1065 for i in range(10):
1066 self.assertEqual(next(it), i*i)
1067 self.assertRaises(StopIteration, it.__next__)
1068
1069 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1070 for i in range(1000):
1071 self.assertEqual(next(it), i*i)
1072 self.assertRaises(StopIteration, it.__next__)
1073
1074 def test_imap_unordered(self):
1075 it = self.pool.imap_unordered(sqr, list(range(1000)))
1076 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1077
1078 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1079 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1080
1081 def test_make_pool(self):
1082 p = multiprocessing.Pool(3)
1083 self.assertEqual(3, len(p._pool))
1084 p.close()
1085 p.join()
1086
1087 def test_terminate(self):
1088 if self.TYPE == 'manager':
1089 # On Unix a forked process increfs each shared object to
1090 # which its parent process held a reference. If the
1091 # forked process gets terminated then there is likely to
1092 # be a reference leak. So to prevent
1093 # _TestZZZNumberOfObjects from failing we skip this test
1094 # when using a manager.
1095 return
1096
1097 result = self.pool.map_async(
1098 time.sleep, [0.1 for i in range(10000)], chunksize=1
1099 )
1100 self.pool.terminate()
1101 join = TimingWrapper(self.pool.join)
1102 join()
Victor Stinner29943aa2011-03-24 16:24:07 +01001103 self.assertLess(join.elapsed, 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001104#
1105# Test that manager has expected number of shared objects left
1106#
1107
1108class _TestZZZNumberOfObjects(BaseTestCase):
1109 # Because test cases are sorted alphabetically, this one will get
1110 # run after all the other tests for the manager. It tests that
1111 # there have been no "reference leaks" for the manager's shared
1112 # objects. Note the comment in _TestPool.test_terminate().
1113 ALLOWED_TYPES = ('manager',)
1114
1115 def test_number_of_objects(self):
1116 EXPECTED_NUMBER = 1 # the pool object is still alive
1117 multiprocessing.active_children() # discard dead process objs
1118 gc.collect() # do garbage collection
1119 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001120 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001121 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001122 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001123 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001124
1125 self.assertEqual(refs, EXPECTED_NUMBER)
1126
1127#
1128# Test of creating a customized manager class
1129#
1130
1131from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1132
1133class FooBar(object):
1134 def f(self):
1135 return 'f()'
1136 def g(self):
1137 raise ValueError
1138 def _h(self):
1139 return '_h()'
1140
1141def baz():
1142 for i in range(10):
1143 yield i*i
1144
1145class IteratorProxy(BaseProxy):
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001146 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001147 def __iter__(self):
1148 return self
1149 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001150 return self._callmethod('__next__')
1151
1152class MyManager(BaseManager):
1153 pass
1154
1155MyManager.register('Foo', callable=FooBar)
1156MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1157MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1158
1159
1160class _TestMyManager(BaseTestCase):
1161
1162 ALLOWED_TYPES = ('manager',)
1163
1164 def test_mymanager(self):
1165 manager = MyManager()
1166 manager.start()
1167
1168 foo = manager.Foo()
1169 bar = manager.Bar()
1170 baz = manager.baz()
1171
1172 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1173 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1174
1175 self.assertEqual(foo_methods, ['f', 'g'])
1176 self.assertEqual(bar_methods, ['f', '_h'])
1177
1178 self.assertEqual(foo.f(), 'f()')
1179 self.assertRaises(ValueError, foo.g)
1180 self.assertEqual(foo._callmethod('f'), 'f()')
1181 self.assertRaises(RemoteError, foo._callmethod, '_h')
1182
1183 self.assertEqual(bar.f(), 'f()')
1184 self.assertEqual(bar._h(), '_h()')
1185 self.assertEqual(bar._callmethod('f'), 'f()')
1186 self.assertEqual(bar._callmethod('_h'), '_h()')
1187
1188 self.assertEqual(list(baz), [i*i for i in range(10)])
1189
1190 manager.shutdown()
1191
1192#
1193# Test of connecting to a remote server and using xmlrpclib for serialization
1194#
1195
1196_queue = pyqueue.Queue()
1197def get_queue():
1198 return _queue
1199
1200class QueueManager(BaseManager):
1201 '''manager class used by server process'''
1202QueueManager.register('get_queue', callable=get_queue)
1203
1204class QueueManager2(BaseManager):
1205 '''manager class which specifies the same interface as QueueManager'''
1206QueueManager2.register('get_queue')
1207
1208
1209SERIALIZER = 'xmlrpclib'
1210
1211class _TestRemoteManager(BaseTestCase):
1212
1213 ALLOWED_TYPES = ('manager',)
1214
Antoine Pitrou26899f42010-11-02 23:52:49 +00001215 @classmethod
1216 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001217 manager = QueueManager2(
1218 address=address, authkey=authkey, serializer=SERIALIZER
1219 )
1220 manager.connect()
1221 queue = manager.get_queue()
1222 queue.put(('hello world', None, True, 2.25))
1223
1224 def test_remote(self):
1225 authkey = os.urandom(32)
1226
1227 manager = QueueManager(
1228 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1229 )
1230 manager.start()
1231
1232 p = self.Process(target=self._putter, args=(manager.address, authkey))
1233 p.start()
1234
1235 manager2 = QueueManager2(
1236 address=manager.address, authkey=authkey, serializer=SERIALIZER
1237 )
1238 manager2.connect()
1239 queue = manager2.get_queue()
1240
1241 # Note that xmlrpclib will deserialize object as a list not a tuple
1242 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1243
1244 # Because we are using xmlrpclib for serialization instead of
1245 # pickle this will cause a serialization error.
1246 self.assertRaises(Exception, queue.put, time.sleep)
1247
1248 # Make queue finalizer run before the server is stopped
1249 del queue
1250 manager.shutdown()
1251
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001252class _TestManagerRestart(BaseTestCase):
1253
Antoine Pitrou26899f42010-11-02 23:52:49 +00001254 @classmethod
1255 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001256 manager = QueueManager(
1257 address=address, authkey=authkey, serializer=SERIALIZER)
1258 manager.connect()
1259 queue = manager.get_queue()
1260 queue.put('hello world')
1261
1262 def test_rapid_restart(self):
1263 authkey = os.urandom(32)
1264 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001265 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin9e2fadc2010-11-01 05:12:34 +00001266 srvr = manager.get_server()
1267 addr = srvr.address
1268 # Close the connection.Listener socket which gets opened as a part
1269 # of manager.get_server(). It's not needed for the test.
1270 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001271 manager.start()
1272
1273 p = self.Process(target=self._putter, args=(manager.address, authkey))
1274 p.start()
1275 queue = manager.get_queue()
1276 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001277 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001278 manager.shutdown()
1279 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001280 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001281 try:
1282 manager.start()
1283 except IOError as e:
1284 if e.errno != errno.EADDRINUSE:
1285 raise
1286 # Retry after some time, in case the old socket was lingering
1287 # (sporadic failure on buildbots)
1288 time.sleep(1.0)
1289 manager = QueueManager(
1290 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001291 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001292
Benjamin Petersone711caf2008-06-11 16:44:04 +00001293#
1294#
1295#
1296
1297SENTINEL = latin('')
1298
1299class _TestConnection(BaseTestCase):
1300
1301 ALLOWED_TYPES = ('processes', 'threads')
1302
Antoine Pitrou26899f42010-11-02 23:52:49 +00001303 @classmethod
1304 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001305 for msg in iter(conn.recv_bytes, SENTINEL):
1306 conn.send_bytes(msg)
1307 conn.close()
1308
1309 def test_connection(self):
1310 conn, child_conn = self.Pipe()
1311
1312 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001313 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001314 p.start()
1315
1316 seq = [1, 2.25, None]
1317 msg = latin('hello world')
1318 longmsg = msg * 10
1319 arr = array.array('i', list(range(4)))
1320
1321 if self.TYPE == 'processes':
1322 self.assertEqual(type(conn.fileno()), int)
1323
1324 self.assertEqual(conn.send(seq), None)
1325 self.assertEqual(conn.recv(), seq)
1326
1327 self.assertEqual(conn.send_bytes(msg), None)
1328 self.assertEqual(conn.recv_bytes(), msg)
1329
1330 if self.TYPE == 'processes':
1331 buffer = array.array('i', [0]*10)
1332 expected = list(arr) + [0] * (10 - len(arr))
1333 self.assertEqual(conn.send_bytes(arr), None)
1334 self.assertEqual(conn.recv_bytes_into(buffer),
1335 len(arr) * buffer.itemsize)
1336 self.assertEqual(list(buffer), expected)
1337
1338 buffer = array.array('i', [0]*10)
1339 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1340 self.assertEqual(conn.send_bytes(arr), None)
1341 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1342 len(arr) * buffer.itemsize)
1343 self.assertEqual(list(buffer), expected)
1344
1345 buffer = bytearray(latin(' ' * 40))
1346 self.assertEqual(conn.send_bytes(longmsg), None)
1347 try:
1348 res = conn.recv_bytes_into(buffer)
1349 except multiprocessing.BufferTooShort as e:
1350 self.assertEqual(e.args, (longmsg,))
1351 else:
1352 self.fail('expected BufferTooShort, got %s' % res)
1353
1354 poll = TimingWrapper(conn.poll)
1355
1356 self.assertEqual(poll(), False)
1357 self.assertTimingAlmostEqual(poll.elapsed, 0)
1358
1359 self.assertEqual(poll(TIMEOUT1), False)
1360 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1361
1362 conn.send(None)
1363
1364 self.assertEqual(poll(TIMEOUT1), True)
1365 self.assertTimingAlmostEqual(poll.elapsed, 0)
1366
1367 self.assertEqual(conn.recv(), None)
1368
1369 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1370 conn.send_bytes(really_big_msg)
1371 self.assertEqual(conn.recv_bytes(), really_big_msg)
1372
1373 conn.send_bytes(SENTINEL) # tell child to quit
1374 child_conn.close()
1375
1376 if self.TYPE == 'processes':
1377 self.assertEqual(conn.readable, True)
1378 self.assertEqual(conn.writable, True)
1379 self.assertRaises(EOFError, conn.recv)
1380 self.assertRaises(EOFError, conn.recv_bytes)
1381
1382 p.join()
1383
1384 def test_duplex_false(self):
1385 reader, writer = self.Pipe(duplex=False)
1386 self.assertEqual(writer.send(1), None)
1387 self.assertEqual(reader.recv(), 1)
1388 if self.TYPE == 'processes':
1389 self.assertEqual(reader.readable, True)
1390 self.assertEqual(reader.writable, False)
1391 self.assertEqual(writer.readable, False)
1392 self.assertEqual(writer.writable, True)
1393 self.assertRaises(IOError, reader.send, 2)
1394 self.assertRaises(IOError, writer.recv)
1395 self.assertRaises(IOError, writer.poll)
1396
1397 def test_spawn_close(self):
1398 # We test that a pipe connection can be closed by parent
1399 # process immediately after child is spawned. On Windows this
1400 # would have sometimes failed on old versions because
1401 # child_conn would be closed before the child got a chance to
1402 # duplicate it.
1403 conn, child_conn = self.Pipe()
1404
1405 p = self.Process(target=self._echo, args=(child_conn,))
1406 p.start()
1407 child_conn.close() # this might complete before child initializes
1408
1409 msg = latin('hello')
1410 conn.send_bytes(msg)
1411 self.assertEqual(conn.recv_bytes(), msg)
1412
1413 conn.send_bytes(SENTINEL)
1414 conn.close()
1415 p.join()
1416
1417 def test_sendbytes(self):
1418 if self.TYPE != 'processes':
1419 return
1420
1421 msg = latin('abcdefghijklmnopqrstuvwxyz')
1422 a, b = self.Pipe()
1423
1424 a.send_bytes(msg)
1425 self.assertEqual(b.recv_bytes(), msg)
1426
1427 a.send_bytes(msg, 5)
1428 self.assertEqual(b.recv_bytes(), msg[5:])
1429
1430 a.send_bytes(msg, 7, 8)
1431 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1432
1433 a.send_bytes(msg, 26)
1434 self.assertEqual(b.recv_bytes(), latin(''))
1435
1436 a.send_bytes(msg, 26, 0)
1437 self.assertEqual(b.recv_bytes(), latin(''))
1438
1439 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1440
1441 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1442
1443 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1444
1445 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1446
1447 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1448
Benjamin Petersone711caf2008-06-11 16:44:04 +00001449class _TestListenerClient(BaseTestCase):
1450
1451 ALLOWED_TYPES = ('processes', 'threads')
1452
Antoine Pitrou26899f42010-11-02 23:52:49 +00001453 @classmethod
1454 def _test(cls, address):
1455 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001456 conn.send('hello')
1457 conn.close()
1458
1459 def test_listener_client(self):
1460 for family in self.connection.families:
1461 l = self.connection.Listener(family=family)
1462 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001463 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001464 p.start()
1465 conn = l.accept()
1466 self.assertEqual(conn.recv(), 'hello')
1467 p.join()
1468 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001469#
1470# Test of sending connection and socket objects between processes
1471#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001472"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001473class _TestPicklingConnections(BaseTestCase):
1474
1475 ALLOWED_TYPES = ('processes',)
1476
1477 def _listener(self, conn, families):
1478 for fam in families:
1479 l = self.connection.Listener(family=fam)
1480 conn.send(l.address)
1481 new_conn = l.accept()
1482 conn.send(new_conn)
1483
1484 if self.TYPE == 'processes':
1485 l = socket.socket()
1486 l.bind(('localhost', 0))
1487 conn.send(l.getsockname())
1488 l.listen(1)
1489 new_conn, addr = l.accept()
1490 conn.send(new_conn)
1491
1492 conn.recv()
1493
1494 def _remote(self, conn):
1495 for (address, msg) in iter(conn.recv, None):
1496 client = self.connection.Client(address)
1497 client.send(msg.upper())
1498 client.close()
1499
1500 if self.TYPE == 'processes':
1501 address, msg = conn.recv()
1502 client = socket.socket()
1503 client.connect(address)
1504 client.sendall(msg.upper())
1505 client.close()
1506
1507 conn.close()
1508
1509 def test_pickling(self):
1510 try:
1511 multiprocessing.allow_connection_pickling()
1512 except ImportError:
1513 return
1514
1515 families = self.connection.families
1516
1517 lconn, lconn0 = self.Pipe()
1518 lp = self.Process(target=self._listener, args=(lconn0, families))
1519 lp.start()
1520 lconn0.close()
1521
1522 rconn, rconn0 = self.Pipe()
1523 rp = self.Process(target=self._remote, args=(rconn0,))
1524 rp.start()
1525 rconn0.close()
1526
1527 for fam in families:
1528 msg = ('This connection uses family %s' % fam).encode('ascii')
1529 address = lconn.recv()
1530 rconn.send((address, msg))
1531 new_conn = lconn.recv()
1532 self.assertEqual(new_conn.recv(), msg.upper())
1533
1534 rconn.send(None)
1535
1536 if self.TYPE == 'processes':
1537 msg = latin('This connection uses a normal socket')
1538 address = lconn.recv()
1539 rconn.send((address, msg))
1540 if hasattr(socket, 'fromfd'):
1541 new_conn = lconn.recv()
1542 self.assertEqual(new_conn.recv(100), msg.upper())
1543 else:
1544 # XXX On Windows with Py2.6 need to backport fromfd()
1545 discard = lconn.recv_bytes()
1546
1547 lconn.send(None)
1548
1549 rconn.close()
1550 lconn.close()
1551
1552 lp.join()
1553 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001554"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001555#
1556#
1557#
1558
1559class _TestHeap(BaseTestCase):
1560
1561 ALLOWED_TYPES = ('processes',)
1562
1563 def test_heap(self):
1564 iterations = 5000
1565 maxblocks = 50
1566 blocks = []
1567
1568 # create and destroy lots of blocks of different sizes
1569 for i in range(iterations):
1570 size = int(random.lognormvariate(0, 1) * 1000)
1571 b = multiprocessing.heap.BufferWrapper(size)
1572 blocks.append(b)
1573 if len(blocks) > maxblocks:
1574 i = random.randrange(maxblocks)
1575 del blocks[i]
1576
1577 # get the heap object
1578 heap = multiprocessing.heap.BufferWrapper._heap
1579
1580 # verify the state of the heap
1581 all = []
1582 occupied = 0
1583 for L in list(heap._len_to_seq.values()):
1584 for arena, start, stop in L:
1585 all.append((heap._arenas.index(arena), start, stop,
1586 stop-start, 'free'))
1587 for arena, start, stop in heap._allocated_blocks:
1588 all.append((heap._arenas.index(arena), start, stop,
1589 stop-start, 'occupied'))
1590 occupied += (stop-start)
1591
1592 all.sort()
1593
1594 for i in range(len(all)-1):
1595 (arena, start, stop) = all[i][:3]
1596 (narena, nstart, nstop) = all[i+1][:3]
1597 self.assertTrue((arena != narena and nstart == 0) or
1598 (stop == nstart))
1599
1600#
1601#
1602#
1603
Benjamin Petersone711caf2008-06-11 16:44:04 +00001604class _Foo(Structure):
1605 _fields_ = [
1606 ('x', c_int),
1607 ('y', c_double)
1608 ]
1609
1610class _TestSharedCTypes(BaseTestCase):
1611
1612 ALLOWED_TYPES = ('processes',)
1613
Antoine Pitrou72d5a9d2010-11-22 16:33:23 +00001614 def setUp(self):
1615 if not HAS_SHAREDCTYPES:
1616 self.skipTest("requires multiprocessing.sharedctypes")
1617
Antoine Pitrou26899f42010-11-02 23:52:49 +00001618 @classmethod
1619 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001620 x.value *= 2
1621 y.value *= 2
1622 foo.x *= 2
1623 foo.y *= 2
1624 string.value *= 2
1625 for i in range(len(arr)):
1626 arr[i] *= 2
1627
1628 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001629 x = Value('i', 7, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001630 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001631 foo = Value(_Foo, 3, 2, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001632 arr = self.Array('d', list(range(10)), lock=lock)
1633 string = self.Array('c', 20, lock=lock)
1634 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001635
1636 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1637 p.start()
1638 p.join()
1639
1640 self.assertEqual(x.value, 14)
1641 self.assertAlmostEqual(y.value, 2.0/3.0)
1642 self.assertEqual(foo.x, 6)
1643 self.assertAlmostEqual(foo.y, 4.0)
1644 for i in range(10):
1645 self.assertAlmostEqual(arr[i], i*2)
1646 self.assertEqual(string.value, latin('hellohello'))
1647
1648 def test_synchronize(self):
1649 self.test_sharedctypes(lock=True)
1650
1651 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001652 foo = _Foo(2, 5.0)
Brian Curtin918616c2010-10-07 02:12:17 +00001653 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001654 foo.x = 0
1655 foo.y = 0
1656 self.assertEqual(bar.x, 2)
1657 self.assertAlmostEqual(bar.y, 5.0)
1658
1659#
1660#
1661#
1662
1663class _TestFinalize(BaseTestCase):
1664
1665 ALLOWED_TYPES = ('processes',)
1666
Antoine Pitrou26899f42010-11-02 23:52:49 +00001667 @classmethod
1668 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001669 class Foo(object):
1670 pass
1671
1672 a = Foo()
1673 util.Finalize(a, conn.send, args=('a',))
1674 del a # triggers callback for a
1675
1676 b = Foo()
1677 close_b = util.Finalize(b, conn.send, args=('b',))
1678 close_b() # triggers callback for b
1679 close_b() # does nothing because callback has already been called
1680 del b # does nothing because callback has already been called
1681
1682 c = Foo()
1683 util.Finalize(c, conn.send, args=('c',))
1684
1685 d10 = Foo()
1686 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1687
1688 d01 = Foo()
1689 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1690 d02 = Foo()
1691 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1692 d03 = Foo()
1693 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1694
1695 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1696
1697 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1698
Ezio Melotti13925002011-03-16 11:05:33 +02001699 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001700 # garbage collecting locals
1701 util._exit_function()
1702 conn.close()
1703 os._exit(0)
1704
1705 def test_finalize(self):
1706 conn, child_conn = self.Pipe()
1707
1708 p = self.Process(target=self._test_finalize, args=(child_conn,))
1709 p.start()
1710 p.join()
1711
1712 result = [obj for obj in iter(conn.recv, 'STOP')]
1713 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1714
1715#
1716# Test that from ... import * works for each module
1717#
1718
1719class _TestImportStar(BaseTestCase):
1720
1721 ALLOWED_TYPES = ('processes',)
1722
1723 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001724 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001725 'multiprocessing', 'multiprocessing.connection',
1726 'multiprocessing.heap', 'multiprocessing.managers',
1727 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001728 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001729 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001730 ]
1731
1732 if c_int is not None:
1733 # This module requires _ctypes
1734 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001735
1736 for name in modules:
1737 __import__(name)
1738 mod = sys.modules[name]
1739
1740 for attr in getattr(mod, '__all__', ()):
1741 self.assertTrue(
1742 hasattr(mod, attr),
1743 '%r does not have attribute %r' % (mod, attr)
1744 )
1745
1746#
1747# Quick test that logging works -- does not test logging output
1748#
1749
1750class _TestLogging(BaseTestCase):
1751
1752 ALLOWED_TYPES = ('processes',)
1753
1754 def test_enable_logging(self):
1755 logger = multiprocessing.get_logger()
1756 logger.setLevel(util.SUBWARNING)
1757 self.assertTrue(logger is not None)
1758 logger.debug('this will not be printed')
1759 logger.info('nor will this')
1760 logger.setLevel(LOG_LEVEL)
1761
Antoine Pitrou26899f42010-11-02 23:52:49 +00001762 @classmethod
1763 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001764 logger = multiprocessing.get_logger()
1765 conn.send(logger.getEffectiveLevel())
1766
1767 def test_level(self):
1768 LEVEL1 = 32
1769 LEVEL2 = 37
1770
1771 logger = multiprocessing.get_logger()
1772 root_logger = logging.getLogger()
1773 root_level = root_logger.level
1774
1775 reader, writer = multiprocessing.Pipe(duplex=False)
1776
1777 logger.setLevel(LEVEL1)
1778 self.Process(target=self._test_level, args=(writer,)).start()
1779 self.assertEqual(LEVEL1, reader.recv())
1780
1781 logger.setLevel(logging.NOTSET)
1782 root_logger.setLevel(LEVEL2)
1783 self.Process(target=self._test_level, args=(writer,)).start()
1784 self.assertEqual(LEVEL2, reader.recv())
1785
1786 root_logger.setLevel(root_level)
1787 logger.setLevel(level=LOG_LEVEL)
1788
1789#
Jesse Noller6214edd2009-01-19 16:23:53 +00001790# Test to verify handle verification, see issue 3321
1791#
1792
1793class TestInvalidHandle(unittest.TestCase):
1794
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001795 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001796 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001797 conn = _multiprocessing.Connection(44977608)
1798 self.assertRaises(IOError, conn.poll)
1799 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001800
Jesse Noller6214edd2009-01-19 16:23:53 +00001801#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001802# Functions used to create test cases from the base ones in this module
1803#
1804
1805def get_attributes(Source, names):
1806 d = {}
1807 for name in names:
1808 obj = getattr(Source, name)
1809 if type(obj) == type(get_attributes):
1810 obj = staticmethod(obj)
1811 d[name] = obj
1812 return d
1813
1814def create_test_cases(Mixin, type):
1815 result = {}
1816 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001817 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001818
1819 for name in list(glob.keys()):
1820 if name.startswith('_Test'):
1821 base = glob[name]
1822 if type in base.ALLOWED_TYPES:
1823 newname = 'With' + Type + name[1:]
1824 class Temp(base, unittest.TestCase, Mixin):
1825 pass
1826 result[newname] = Temp
1827 Temp.__name__ = newname
1828 Temp.__module__ = Mixin.__module__
1829 return result
1830
1831#
1832# Create test cases
1833#
1834
1835class ProcessesMixin(object):
1836 TYPE = 'processes'
1837 Process = multiprocessing.Process
1838 locals().update(get_attributes(multiprocessing, (
1839 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1840 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1841 'RawArray', 'current_process', 'active_children', 'Pipe',
1842 'connection', 'JoinableQueue'
1843 )))
1844
1845testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1846globals().update(testcases_processes)
1847
1848
1849class ManagerMixin(object):
1850 TYPE = 'manager'
1851 Process = multiprocessing.Process
1852 manager = object.__new__(multiprocessing.managers.SyncManager)
1853 locals().update(get_attributes(manager, (
1854 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1855 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1856 'Namespace', 'JoinableQueue'
1857 )))
1858
1859testcases_manager = create_test_cases(ManagerMixin, type='manager')
1860globals().update(testcases_manager)
1861
1862
1863class ThreadsMixin(object):
1864 TYPE = 'threads'
1865 Process = multiprocessing.dummy.Process
1866 locals().update(get_attributes(multiprocessing.dummy, (
1867 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1868 'Condition', 'Event', 'Value', 'Array', 'current_process',
1869 'active_children', 'Pipe', 'connection', 'dict', 'list',
1870 'Namespace', 'JoinableQueue'
1871 )))
1872
1873testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1874globals().update(testcases_threads)
1875
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001876class OtherTest(unittest.TestCase):
1877 # TODO: add more tests for deliver/answer challenge.
1878 def test_deliver_challenge_auth_failure(self):
1879 class _FakeConnection(object):
1880 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001881 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001882 def send_bytes(self, data):
1883 pass
1884 self.assertRaises(multiprocessing.AuthenticationError,
1885 multiprocessing.connection.deliver_challenge,
1886 _FakeConnection(), b'abc')
1887
1888 def test_answer_challenge_auth_failure(self):
1889 class _FakeConnection(object):
1890 def __init__(self):
1891 self.count = 0
1892 def recv_bytes(self, size):
1893 self.count += 1
1894 if self.count == 1:
1895 return multiprocessing.connection.CHALLENGE
1896 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001897 return b'something bogus'
1898 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001899 def send_bytes(self, data):
1900 pass
1901 self.assertRaises(multiprocessing.AuthenticationError,
1902 multiprocessing.connection.answer_challenge,
1903 _FakeConnection(), b'abc')
1904
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001905#
1906# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1907#
1908
1909def initializer(ns):
1910 ns.test += 1
1911
1912class TestInitializers(unittest.TestCase):
1913 def setUp(self):
1914 self.mgr = multiprocessing.Manager()
1915 self.ns = self.mgr.Namespace()
1916 self.ns.test = 0
1917
1918 def tearDown(self):
1919 self.mgr.shutdown()
1920
1921 def test_manager_initializer(self):
1922 m = multiprocessing.managers.SyncManager()
1923 self.assertRaises(TypeError, m.start, 1)
1924 m.start(initializer, (self.ns,))
1925 self.assertEqual(self.ns.test, 1)
1926 m.shutdown()
1927
1928 def test_pool_initializer(self):
1929 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1930 p = multiprocessing.Pool(1, initializer, (self.ns,))
1931 p.close()
1932 p.join()
1933 self.assertEqual(self.ns.test, 1)
1934
R. David Murraya44c6b32009-07-29 15:40:30 +00001935#
1936# Issue 5155, 5313, 5331: Test process in processes
1937# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1938#
1939
1940def _ThisSubProcess(q):
1941 try:
1942 item = q.get(block=False)
1943 except pyqueue.Empty:
1944 pass
1945
1946def _TestProcess(q):
1947 queue = multiprocessing.Queue()
1948 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1949 subProc.start()
1950 subProc.join()
1951
1952def _afunc(x):
1953 return x*x
1954
1955def pool_in_process():
1956 pool = multiprocessing.Pool(processes=4)
1957 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1958
1959class _file_like(object):
1960 def __init__(self, delegate):
1961 self._delegate = delegate
1962 self._pid = None
1963
1964 @property
1965 def cache(self):
1966 pid = os.getpid()
1967 # There are no race conditions since fork keeps only the running thread
1968 if pid != self._pid:
1969 self._pid = pid
1970 self._cache = []
1971 return self._cache
1972
1973 def write(self, data):
1974 self.cache.append(data)
1975
1976 def flush(self):
1977 self._delegate.write(''.join(self.cache))
1978 self._cache = []
1979
1980class TestStdinBadfiledescriptor(unittest.TestCase):
1981
1982 def test_queue_in_process(self):
1983 queue = multiprocessing.Queue()
1984 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1985 proc.start()
1986 proc.join()
1987
1988 def test_pool_in_process(self):
1989 p = multiprocessing.Process(target=pool_in_process)
1990 p.start()
1991 p.join()
1992
1993 def test_flushing(self):
1994 sio = io.StringIO()
1995 flike = _file_like(sio)
1996 flike.write('foo')
1997 proc = multiprocessing.Process(target=lambda: flike.flush())
1998 flike.flush()
1999 assert sio.getvalue() == 'foo'
2000
2001testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2002 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002003
Benjamin Petersone711caf2008-06-11 16:44:04 +00002004#
2005#
2006#
2007
2008def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002009 if sys.platform.startswith("linux"):
2010 try:
2011 lock = multiprocessing.RLock()
2012 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002013 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002014
Benjamin Petersone711caf2008-06-11 16:44:04 +00002015 if run is None:
2016 from test.support import run_unittest as run
2017
2018 util.get_temp_dir() # creates temp directory for use by all processes
2019
2020 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2021
Benjamin Peterson41181742008-07-02 20:22:54 +00002022 ProcessesMixin.pool = multiprocessing.Pool(4)
2023 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2024 ManagerMixin.manager.__init__()
2025 ManagerMixin.manager.start()
2026 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002027
2028 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002029 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2030 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002031 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2032 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002033 )
2034
2035 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2036 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2037 run(suite)
2038
Benjamin Peterson41181742008-07-02 20:22:54 +00002039 ThreadsMixin.pool.terminate()
2040 ProcessesMixin.pool.terminate()
2041 ManagerMixin.pool.terminate()
2042 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002043
Benjamin Peterson41181742008-07-02 20:22:54 +00002044 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002045
2046def main():
2047 test_main(unittest.TextTestRunner(verbosity=2).run)
2048
2049if __name__ == '__main__':
2050 main()