blob: e9d03293bf7e14fc20e74d68787495690d97887d [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000028
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000034
35from multiprocessing import util
36
Brian Curtin918616c2010-10-07 02:12:17 +000037try:
38 from multiprocessing.sharedctypes import Value, copy
39 HAS_SHAREDCTYPES = True
40except ImportError:
41 HAS_SHAREDCTYPES = False
42
Benjamin Petersone711caf2008-06-11 16:44:04 +000043#
44#
45#
46
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000047def latin(s):
48 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Benjamin Petersone711caf2008-06-11 16:44:04 +000050#
51# Constants
52#
53
54LOG_LEVEL = util.SUBWARNING
55#LOG_LEVEL = logging.WARNING
56
57DELTA = 0.1
58CHECK_TIMINGS = False # making true makes tests take a lot longer
59 # and can sometimes cause some non-serious
60 # failures because some calls block a bit
61 # longer than expected
62if CHECK_TIMINGS:
63 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
64else:
65 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
66
67HAVE_GETVALUE = not getattr(_multiprocessing,
68 'HAVE_BROKEN_SEM_GETVALUE', False)
69
Jesse Noller6214edd2009-01-19 16:23:53 +000070WIN32 = (sys.platform == "win32")
71
Benjamin Petersone711caf2008-06-11 16:44:04 +000072#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000073# Some tests require ctypes
74#
75
76try:
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000077 from ctypes import Structure, c_int, c_double
Florent Xicluna9b0e9182010-03-28 11:42:38 +000078except ImportError:
79 Structure = object
80 c_int = c_double = None
81
82#
Benjamin Petersone711caf2008-06-11 16:44:04 +000083# Creates a wrapper for a function which records the time it takes to finish
84#
85
86class TimingWrapper(object):
87
88 def __init__(self, func):
89 self.func = func
90 self.elapsed = None
91
92 def __call__(self, *args, **kwds):
93 t = time.time()
94 try:
95 return self.func(*args, **kwds)
96 finally:
97 self.elapsed = time.time() - t
98
99#
100# Base class for test cases
101#
102
103class BaseTestCase(object):
104
105 ALLOWED_TYPES = ('processes', 'manager', 'threads')
106
107 def assertTimingAlmostEqual(self, a, b):
108 if CHECK_TIMINGS:
109 self.assertAlmostEqual(a, b, 1)
110
111 def assertReturnsIfImplemented(self, value, func, *args):
112 try:
113 res = func(*args)
114 except NotImplementedError:
115 pass
116 else:
117 return self.assertEqual(value, res)
118
Antoine Pitrou26899f42010-11-02 23:52:49 +0000119 # For the sanity of Windows users, rather than crashing or freezing in
120 # multiple ways.
121 def __reduce__(self, *args):
122 raise NotImplementedError("shouldn't try to pickle a test case")
123
124 __reduce_ex__ = __reduce__
125
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126#
127# Return the value of a semaphore
128#
129
130def get_value(self):
131 try:
132 return self.get_value()
133 except AttributeError:
134 try:
135 return self._Semaphore__value
136 except AttributeError:
137 try:
138 return self._value
139 except AttributeError:
140 raise NotImplementedError
141
142#
143# Testcases
144#
145
146class _TestProcess(BaseTestCase):
147
148 ALLOWED_TYPES = ('processes', 'threads')
149
150 def test_current(self):
151 if self.TYPE == 'threads':
152 return
153
154 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000155 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156
157 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000158 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 self.assertTrue(isinstance(authkey, bytes))
160 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000161 self.assertEqual(current.ident, os.getpid())
162 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163
Antoine Pitrou26899f42010-11-02 23:52:49 +0000164 @classmethod
165 def _test(cls, q, *args, **kwds):
166 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000167 q.put(args)
168 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000169 q.put(current.name)
Antoine Pitrou26899f42010-11-02 23:52:49 +0000170 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000171 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 q.put(current.pid)
173
174 def test_process(self):
175 q = self.Queue(1)
176 e = self.Event()
177 args = (q, 1, 2)
178 kwargs = {'hello':23, 'bye':2.54}
179 name = 'SomeProcess'
180 p = self.Process(
181 target=self._test, args=args, kwargs=kwargs, name=name
182 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000183 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000184 current = self.current_process()
185
186 if self.TYPE != 'threads':
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000187 self.assertEqual(p.authkey, current.authkey)
188 self.assertEqual(p.is_alive(), False)
189 self.assertEqual(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190 self.assertTrue(p not in self.active_children())
191 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000192 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000193
194 p.start()
195
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000196 self.assertEqual(p.exitcode, None)
197 self.assertEqual(p.is_alive(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198 self.assertTrue(p in self.active_children())
199
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000200 self.assertEqual(q.get(), args[1:])
201 self.assertEqual(q.get(), kwargs)
202 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 if self.TYPE != 'threads':
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000204 self.assertEqual(q.get(), current.authkey)
205 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
207 p.join()
208
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000209 self.assertEqual(p.exitcode, 0)
210 self.assertEqual(p.is_alive(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000211 self.assertTrue(p not in self.active_children())
212
Antoine Pitrou26899f42010-11-02 23:52:49 +0000213 @classmethod
214 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000215 time.sleep(1000)
216
217 def test_terminate(self):
218 if self.TYPE == 'threads':
219 return
220
221 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000222 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 p.start()
224
225 self.assertEqual(p.is_alive(), True)
226 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 p.terminate()
230
231 join = TimingWrapper(p.join)
232 self.assertEqual(join(), None)
233 self.assertTimingAlmostEqual(join.elapsed, 0.0)
234
235 self.assertEqual(p.is_alive(), False)
236 self.assertTrue(p not in self.active_children())
237
238 p.join()
239
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 # XXX sometimes get p.exitcode == 0 on Windows ...
241 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242
243 def test_cpu_count(self):
244 try:
245 cpus = multiprocessing.cpu_count()
246 except NotImplementedError:
247 cpus = 1
248 self.assertTrue(type(cpus) is int)
249 self.assertTrue(cpus >= 1)
250
251 def test_active_children(self):
252 self.assertEqual(type(self.active_children()), list)
253
254 p = self.Process(target=time.sleep, args=(DELTA,))
255 self.assertTrue(p not in self.active_children())
256
257 p.start()
258 self.assertTrue(p in self.active_children())
259
260 p.join()
261 self.assertTrue(p not in self.active_children())
262
Antoine Pitrou26899f42010-11-02 23:52:49 +0000263 @classmethod
264 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265 from multiprocessing import forking
266 wconn.send(id)
267 if len(id) < 2:
268 for i in range(2):
Antoine Pitrou26899f42010-11-02 23:52:49 +0000269 p = cls.Process(
270 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 )
272 p.start()
273 p.join()
274
275 def test_recursion(self):
276 rconn, wconn = self.Pipe(duplex=False)
277 self._test_recursion(wconn, [])
278
279 time.sleep(DELTA)
280 result = []
281 while rconn.poll():
282 result.append(rconn.recv())
283
284 expected = [
285 [],
286 [0],
287 [0, 0],
288 [0, 1],
289 [1],
290 [1, 0],
291 [1, 1]
292 ]
293 self.assertEqual(result, expected)
294
295#
296#
297#
298
299class _UpperCaser(multiprocessing.Process):
300
301 def __init__(self):
302 multiprocessing.Process.__init__(self)
303 self.child_conn, self.parent_conn = multiprocessing.Pipe()
304
305 def run(self):
306 self.parent_conn.close()
307 for s in iter(self.child_conn.recv, None):
308 self.child_conn.send(s.upper())
309 self.child_conn.close()
310
311 def submit(self, s):
312 assert type(s) is str
313 self.parent_conn.send(s)
314 return self.parent_conn.recv()
315
316 def stop(self):
317 self.parent_conn.send(None)
318 self.parent_conn.close()
319 self.child_conn.close()
320
321class _TestSubclassingProcess(BaseTestCase):
322
323 ALLOWED_TYPES = ('processes',)
324
325 def test_subclassing(self):
326 uppercaser = _UpperCaser()
327 uppercaser.start()
328 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
329 self.assertEqual(uppercaser.submit('world'), 'WORLD')
330 uppercaser.stop()
331 uppercaser.join()
332
333#
334#
335#
336
337def queue_empty(q):
338 if hasattr(q, 'empty'):
339 return q.empty()
340 else:
341 return q.qsize() == 0
342
343def queue_full(q, maxsize):
344 if hasattr(q, 'full'):
345 return q.full()
346 else:
347 return q.qsize() == maxsize
348
349
350class _TestQueue(BaseTestCase):
351
352
Antoine Pitrou26899f42010-11-02 23:52:49 +0000353 @classmethod
354 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000355 child_can_start.wait()
356 for i in range(6):
357 queue.get()
358 parent_can_continue.set()
359
360 def test_put(self):
361 MAXSIZE = 6
362 queue = self.Queue(maxsize=MAXSIZE)
363 child_can_start = self.Event()
364 parent_can_continue = self.Event()
365
366 proc = self.Process(
367 target=self._test_put,
368 args=(queue, child_can_start, parent_can_continue)
369 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000370 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000371 proc.start()
372
373 self.assertEqual(queue_empty(queue), True)
374 self.assertEqual(queue_full(queue, MAXSIZE), False)
375
376 queue.put(1)
377 queue.put(2, True)
378 queue.put(3, True, None)
379 queue.put(4, False)
380 queue.put(5, False, None)
381 queue.put_nowait(6)
382
383 # the values may be in buffer but not yet in pipe so sleep a bit
384 time.sleep(DELTA)
385
386 self.assertEqual(queue_empty(queue), False)
387 self.assertEqual(queue_full(queue, MAXSIZE), True)
388
389 put = TimingWrapper(queue.put)
390 put_nowait = TimingWrapper(queue.put_nowait)
391
392 self.assertRaises(pyqueue.Full, put, 7, False)
393 self.assertTimingAlmostEqual(put.elapsed, 0)
394
395 self.assertRaises(pyqueue.Full, put, 7, False, None)
396 self.assertTimingAlmostEqual(put.elapsed, 0)
397
398 self.assertRaises(pyqueue.Full, put_nowait, 7)
399 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
400
401 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
402 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
403
404 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
405 self.assertTimingAlmostEqual(put.elapsed, 0)
406
407 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
408 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
409
410 child_can_start.set()
411 parent_can_continue.wait()
412
413 self.assertEqual(queue_empty(queue), True)
414 self.assertEqual(queue_full(queue, MAXSIZE), False)
415
416 proc.join()
417
Antoine Pitrou26899f42010-11-02 23:52:49 +0000418 @classmethod
419 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000420 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000421 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 queue.put(2)
423 queue.put(3)
424 queue.put(4)
425 queue.put(5)
426 parent_can_continue.set()
427
428 def test_get(self):
429 queue = self.Queue()
430 child_can_start = self.Event()
431 parent_can_continue = self.Event()
432
433 proc = self.Process(
434 target=self._test_get,
435 args=(queue, child_can_start, parent_can_continue)
436 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000437 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000438 proc.start()
439
440 self.assertEqual(queue_empty(queue), True)
441
442 child_can_start.set()
443 parent_can_continue.wait()
444
445 time.sleep(DELTA)
446 self.assertEqual(queue_empty(queue), False)
447
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000448 # Hangs unexpectedly, remove for now
449 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000450 self.assertEqual(queue.get(True, None), 2)
451 self.assertEqual(queue.get(True), 3)
452 self.assertEqual(queue.get(timeout=1), 4)
453 self.assertEqual(queue.get_nowait(), 5)
454
455 self.assertEqual(queue_empty(queue), True)
456
457 get = TimingWrapper(queue.get)
458 get_nowait = TimingWrapper(queue.get_nowait)
459
460 self.assertRaises(pyqueue.Empty, get, False)
461 self.assertTimingAlmostEqual(get.elapsed, 0)
462
463 self.assertRaises(pyqueue.Empty, get, False, None)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(pyqueue.Empty, get_nowait)
467 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
468
469 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
470 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
471
472 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
473 self.assertTimingAlmostEqual(get.elapsed, 0)
474
475 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
476 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
477
478 proc.join()
479
Antoine Pitrou26899f42010-11-02 23:52:49 +0000480 @classmethod
481 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482 for i in range(10, 20):
483 queue.put(i)
484 # note that at this point the items may only be buffered, so the
485 # process cannot shutdown until the feeder thread has finished
486 # pushing items onto the pipe.
487
488 def test_fork(self):
489 # Old versions of Queue would fail to create a new feeder
490 # thread for a forked process if the original process had its
491 # own feeder thread. This test checks that this no longer
492 # happens.
493
494 queue = self.Queue()
495
496 # put items on queue so that main process starts a feeder thread
497 for i in range(10):
498 queue.put(i)
499
500 # wait to make sure thread starts before we fork a new process
501 time.sleep(DELTA)
502
503 # fork process
504 p = self.Process(target=self._test_fork, args=(queue,))
505 p.start()
506
507 # check that all expected items are in the queue
508 for i in range(20):
509 self.assertEqual(queue.get(), i)
510 self.assertRaises(pyqueue.Empty, queue.get, False)
511
512 p.join()
513
514 def test_qsize(self):
515 q = self.Queue()
516 try:
517 self.assertEqual(q.qsize(), 0)
518 except NotImplementedError:
519 return
520 q.put(1)
521 self.assertEqual(q.qsize(), 1)
522 q.put(5)
523 self.assertEqual(q.qsize(), 2)
524 q.get()
525 self.assertEqual(q.qsize(), 1)
526 q.get()
527 self.assertEqual(q.qsize(), 0)
528
Antoine Pitrou26899f42010-11-02 23:52:49 +0000529 @classmethod
530 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000531 for obj in iter(q.get, None):
532 time.sleep(DELTA)
533 q.task_done()
534
535 def test_task_done(self):
536 queue = self.JoinableQueue()
537
538 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000539 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000540
541 workers = [self.Process(target=self._test_task_done, args=(queue,))
542 for i in range(4)]
543
544 for p in workers:
545 p.start()
546
547 for i in range(10):
548 queue.put(i)
549
550 queue.join()
551
552 for p in workers:
553 queue.put(None)
554
555 for p in workers:
556 p.join()
557
558#
559#
560#
561
562class _TestLock(BaseTestCase):
563
564 def test_lock(self):
565 lock = self.Lock()
566 self.assertEqual(lock.acquire(), True)
567 self.assertEqual(lock.acquire(False), False)
568 self.assertEqual(lock.release(), None)
569 self.assertRaises((ValueError, threading.ThreadError), lock.release)
570
571 def test_rlock(self):
572 lock = self.RLock()
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.acquire(), True)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertEqual(lock.release(), None)
579 self.assertRaises((AssertionError, RuntimeError), lock.release)
580
Jesse Nollerf8d00852009-03-31 03:25:07 +0000581 def test_lock_context(self):
582 with self.Lock():
583 pass
584
Benjamin Petersone711caf2008-06-11 16:44:04 +0000585
586class _TestSemaphore(BaseTestCase):
587
588 def _test_semaphore(self, sem):
589 self.assertReturnsIfImplemented(2, get_value, sem)
590 self.assertEqual(sem.acquire(), True)
591 self.assertReturnsIfImplemented(1, get_value, sem)
592 self.assertEqual(sem.acquire(), True)
593 self.assertReturnsIfImplemented(0, get_value, sem)
594 self.assertEqual(sem.acquire(False), False)
595 self.assertReturnsIfImplemented(0, get_value, sem)
596 self.assertEqual(sem.release(), None)
597 self.assertReturnsIfImplemented(1, get_value, sem)
598 self.assertEqual(sem.release(), None)
599 self.assertReturnsIfImplemented(2, get_value, sem)
600
601 def test_semaphore(self):
602 sem = self.Semaphore(2)
603 self._test_semaphore(sem)
604 self.assertEqual(sem.release(), None)
605 self.assertReturnsIfImplemented(3, get_value, sem)
606 self.assertEqual(sem.release(), None)
607 self.assertReturnsIfImplemented(4, get_value, sem)
608
609 def test_bounded_semaphore(self):
610 sem = self.BoundedSemaphore(2)
611 self._test_semaphore(sem)
612 # Currently fails on OS/X
613 #if HAVE_GETVALUE:
614 # self.assertRaises(ValueError, sem.release)
615 # self.assertReturnsIfImplemented(2, get_value, sem)
616
617 def test_timeout(self):
618 if self.TYPE != 'processes':
619 return
620
621 sem = self.Semaphore(0)
622 acquire = TimingWrapper(sem.acquire)
623
624 self.assertEqual(acquire(False), False)
625 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
626
627 self.assertEqual(acquire(False, None), False)
628 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
629
630 self.assertEqual(acquire(False, TIMEOUT1), False)
631 self.assertTimingAlmostEqual(acquire.elapsed, 0)
632
633 self.assertEqual(acquire(True, TIMEOUT2), False)
634 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
635
636 self.assertEqual(acquire(timeout=TIMEOUT3), False)
637 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
638
639
640class _TestCondition(BaseTestCase):
641
Antoine Pitrou26899f42010-11-02 23:52:49 +0000642 @classmethod
643 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000644 cond.acquire()
645 sleeping.release()
646 cond.wait(timeout)
647 woken.release()
648 cond.release()
649
650 def check_invariant(self, cond):
651 # this is only supposed to succeed when there are no sleepers
652 if self.TYPE == 'processes':
653 try:
654 sleepers = (cond._sleeping_count.get_value() -
655 cond._woken_count.get_value())
656 self.assertEqual(sleepers, 0)
657 self.assertEqual(cond._wait_semaphore.get_value(), 0)
658 except NotImplementedError:
659 pass
660
661 def test_notify(self):
662 cond = self.Condition()
663 sleeping = self.Semaphore(0)
664 woken = self.Semaphore(0)
665
666 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000667 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000668 p.start()
669
670 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000671 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000672 p.start()
673
674 # wait for both children to start sleeping
675 sleeping.acquire()
676 sleeping.acquire()
677
678 # check no process/thread has woken up
679 time.sleep(DELTA)
680 self.assertReturnsIfImplemented(0, get_value, woken)
681
682 # wake up one process/thread
683 cond.acquire()
684 cond.notify()
685 cond.release()
686
687 # check one process/thread has woken up
688 time.sleep(DELTA)
689 self.assertReturnsIfImplemented(1, get_value, woken)
690
691 # wake up another
692 cond.acquire()
693 cond.notify()
694 cond.release()
695
696 # check other has woken up
697 time.sleep(DELTA)
698 self.assertReturnsIfImplemented(2, get_value, woken)
699
700 # check state is not mucked up
701 self.check_invariant(cond)
702 p.join()
703
704 def test_notify_all(self):
705 cond = self.Condition()
706 sleeping = self.Semaphore(0)
707 woken = self.Semaphore(0)
708
709 # start some threads/processes which will timeout
710 for i in range(3):
711 p = self.Process(target=self.f,
712 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000713 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000714 p.start()
715
716 t = threading.Thread(target=self.f,
717 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000718 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000719 t.start()
720
721 # wait for them all to sleep
722 for i in range(6):
723 sleeping.acquire()
724
725 # check they have all timed out
726 for i in range(6):
727 woken.acquire()
728 self.assertReturnsIfImplemented(0, get_value, woken)
729
730 # check state is not mucked up
731 self.check_invariant(cond)
732
733 # start some more threads/processes
734 for i in range(3):
735 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000736 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000737 p.start()
738
739 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000740 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000741 t.start()
742
743 # wait for them to all sleep
744 for i in range(6):
745 sleeping.acquire()
746
747 # check no process/thread has woken up
748 time.sleep(DELTA)
749 self.assertReturnsIfImplemented(0, get_value, woken)
750
751 # wake them all up
752 cond.acquire()
753 cond.notify_all()
754 cond.release()
755
756 # check they have all woken
757 time.sleep(DELTA)
758 self.assertReturnsIfImplemented(6, get_value, woken)
759
760 # check state is not mucked up
761 self.check_invariant(cond)
762
763 def test_timeout(self):
764 cond = self.Condition()
765 wait = TimingWrapper(cond.wait)
766 cond.acquire()
767 res = wait(TIMEOUT1)
768 cond.release()
769 self.assertEqual(res, None)
770 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
771
772
773class _TestEvent(BaseTestCase):
774
Antoine Pitrou26899f42010-11-02 23:52:49 +0000775 @classmethod
776 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000777 time.sleep(TIMEOUT2)
778 event.set()
779
780 def test_event(self):
781 event = self.Event()
782 wait = TimingWrapper(event.wait)
783
Ezio Melotti13925002011-03-16 11:05:33 +0200784 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000786 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787
Benjamin Peterson965ce872009-04-05 21:24:58 +0000788 # Removed, threading.Event.wait() will return the value of the __flag
789 # instead of None. API Shear with the semaphore backed mp.Event
790 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000792 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000793 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
794
795 event.set()
796
797 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000798 self.assertEqual(event.is_set(), True)
799 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000801 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000802 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
803 # self.assertEqual(event.is_set(), True)
804
805 event.clear()
806
807 #self.assertEqual(event.is_set(), False)
808
809 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000810 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811
812#
813#
814#
815
816class _TestValue(BaseTestCase):
817
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000818 ALLOWED_TYPES = ('processes',)
819
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820 codes_values = [
821 ('i', 4343, 24234),
822 ('d', 3.625, -4.25),
823 ('h', -232, 234),
824 ('c', latin('x'), latin('y'))
825 ]
826
Antoine Pitrou72d5a9d2010-11-22 16:33:23 +0000827 def setUp(self):
828 if not HAS_SHAREDCTYPES:
829 self.skipTest("requires multiprocessing.sharedctypes")
830
Antoine Pitrou26899f42010-11-02 23:52:49 +0000831 @classmethod
832 def _test(cls, values):
833 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834 sv.value = cv[2]
835
836
837 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 if raw:
839 values = [self.RawValue(code, value)
840 for code, value, _ in self.codes_values]
841 else:
842 values = [self.Value(code, value)
843 for code, value, _ in self.codes_values]
844
845 for sv, cv in zip(values, self.codes_values):
846 self.assertEqual(sv.value, cv[1])
847
848 proc = self.Process(target=self._test, args=(values,))
849 proc.start()
850 proc.join()
851
852 for sv, cv in zip(values, self.codes_values):
853 self.assertEqual(sv.value, cv[2])
854
855 def test_rawvalue(self):
856 self.test_value(raw=True)
857
858 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 val1 = self.Value('i', 5)
860 lock1 = val1.get_lock()
861 obj1 = val1.get_obj()
862
863 val2 = self.Value('i', 5, lock=None)
864 lock2 = val2.get_lock()
865 obj2 = val2.get_obj()
866
867 lock = self.Lock()
868 val3 = self.Value('i', 5, lock=lock)
869 lock3 = val3.get_lock()
870 obj3 = val3.get_obj()
871 self.assertEqual(lock, lock3)
872
Jesse Nollerb0516a62009-01-18 03:11:38 +0000873 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 self.assertFalse(hasattr(arr4, 'get_lock'))
875 self.assertFalse(hasattr(arr4, 'get_obj'))
876
Jesse Nollerb0516a62009-01-18 03:11:38 +0000877 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
878
879 arr5 = self.RawValue('i', 5)
880 self.assertFalse(hasattr(arr5, 'get_lock'))
881 self.assertFalse(hasattr(arr5, 'get_obj'))
882
Benjamin Petersone711caf2008-06-11 16:44:04 +0000883
884class _TestArray(BaseTestCase):
885
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000886 ALLOWED_TYPES = ('processes',)
887
Antoine Pitrou26899f42010-11-02 23:52:49 +0000888 @classmethod
889 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 for i in range(1, len(seq)):
891 seq[i] += seq[i-1]
892
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000893 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000895 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
896 if raw:
897 arr = self.RawArray('i', seq)
898 else:
899 arr = self.Array('i', seq)
900
901 self.assertEqual(len(arr), len(seq))
902 self.assertEqual(arr[3], seq[3])
903 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
904
905 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
906
907 self.assertEqual(list(arr[:]), seq)
908
909 self.f(seq)
910
911 p = self.Process(target=self.f, args=(arr,))
912 p.start()
913 p.join()
914
915 self.assertEqual(list(arr[:]), seq)
916
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000917 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000918 def test_array_from_size(self):
919 size = 10
920 # Test for zeroing (see issue #11675).
921 # The repetition below strengthens the test by increasing the chances
922 # of previously allocated non-zero memory being used for the new array
923 # on the 2nd and 3rd loops.
924 for _ in range(3):
925 arr = self.Array('i', size)
926 self.assertEqual(len(arr), size)
927 self.assertEqual(list(arr), [0] * size)
928 arr[:] = range(10)
929 self.assertEqual(list(arr), list(range(10)))
930 del arr
931
932 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933 def test_rawarray(self):
934 self.test_array(raw=True)
935
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000936 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 arr1 = self.Array('i', list(range(10)))
939 lock1 = arr1.get_lock()
940 obj1 = arr1.get_obj()
941
942 arr2 = self.Array('i', list(range(10)), lock=None)
943 lock2 = arr2.get_lock()
944 obj2 = arr2.get_obj()
945
946 lock = self.Lock()
947 arr3 = self.Array('i', list(range(10)), lock=lock)
948 lock3 = arr3.get_lock()
949 obj3 = arr3.get_obj()
950 self.assertEqual(lock, lock3)
951
Jesse Nollerb0516a62009-01-18 03:11:38 +0000952 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000953 self.assertFalse(hasattr(arr4, 'get_lock'))
954 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000955 self.assertRaises(AttributeError,
956 self.Array, 'i', range(10), lock='notalock')
957
958 arr5 = self.RawArray('i', range(10))
959 self.assertFalse(hasattr(arr5, 'get_lock'))
960 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961
962#
963#
964#
965
966class _TestContainers(BaseTestCase):
967
968 ALLOWED_TYPES = ('manager',)
969
970 def test_list(self):
971 a = self.list(list(range(10)))
972 self.assertEqual(a[:], list(range(10)))
973
974 b = self.list()
975 self.assertEqual(b[:], [])
976
977 b.extend(list(range(5)))
978 self.assertEqual(b[:], list(range(5)))
979
980 self.assertEqual(b[2], 2)
981 self.assertEqual(b[2:10], [2,3,4])
982
983 b *= 2
984 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
985
986 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
987
988 self.assertEqual(a[:], list(range(10)))
989
990 d = [a, b]
991 e = self.list(d)
992 self.assertEqual(
993 e[:],
994 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
995 )
996
997 f = self.list([a])
998 a.append('hello')
999 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1000
1001 def test_dict(self):
1002 d = self.dict()
1003 indices = list(range(65, 70))
1004 for i in indices:
1005 d[i] = chr(i)
1006 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1007 self.assertEqual(sorted(d.keys()), indices)
1008 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1009 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1010
1011 def test_namespace(self):
1012 n = self.Namespace()
1013 n.name = 'Bob'
1014 n.job = 'Builder'
1015 n._hidden = 'hidden'
1016 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1017 del n.job
1018 self.assertEqual(str(n), "Namespace(name='Bob')")
1019 self.assertTrue(hasattr(n, 'name'))
1020 self.assertTrue(not hasattr(n, 'job'))
1021
1022#
1023#
1024#
1025
1026def sqr(x, wait=0.0):
1027 time.sleep(wait)
1028 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +00001029class _TestPool(BaseTestCase):
1030
1031 def test_apply(self):
1032 papply = self.pool.apply
1033 self.assertEqual(papply(sqr, (5,)), sqr(5))
1034 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1035
1036 def test_map(self):
1037 pmap = self.pool.map
1038 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1039 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1040 list(map(sqr, list(range(100)))))
1041
Georg Brandld80344f2009-08-13 12:26:19 +00001042 def test_map_chunksize(self):
1043 try:
1044 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1045 except multiprocessing.TimeoutError:
1046 self.fail("pool.map_async with chunksize stalled on null list")
1047
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 def test_async(self):
1049 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1050 get = TimingWrapper(res.get)
1051 self.assertEqual(get(), 49)
1052 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1053
1054 def test_async_timeout(self):
1055 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1056 get = TimingWrapper(res.get)
1057 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1058 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1059
1060 def test_imap(self):
1061 it = self.pool.imap(sqr, list(range(10)))
1062 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1063
1064 it = self.pool.imap(sqr, list(range(10)))
1065 for i in range(10):
1066 self.assertEqual(next(it), i*i)
1067 self.assertRaises(StopIteration, it.__next__)
1068
1069 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1070 for i in range(1000):
1071 self.assertEqual(next(it), i*i)
1072 self.assertRaises(StopIteration, it.__next__)
1073
1074 def test_imap_unordered(self):
1075 it = self.pool.imap_unordered(sqr, list(range(1000)))
1076 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1077
1078 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1079 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1080
1081 def test_make_pool(self):
1082 p = multiprocessing.Pool(3)
1083 self.assertEqual(3, len(p._pool))
1084 p.close()
1085 p.join()
1086
1087 def test_terminate(self):
1088 if self.TYPE == 'manager':
1089 # On Unix a forked process increfs each shared object to
1090 # which its parent process held a reference. If the
1091 # forked process gets terminated then there is likely to
1092 # be a reference leak. So to prevent
1093 # _TestZZZNumberOfObjects from failing we skip this test
1094 # when using a manager.
1095 return
1096
1097 result = self.pool.map_async(
1098 time.sleep, [0.1 for i in range(10000)], chunksize=1
1099 )
1100 self.pool.terminate()
1101 join = TimingWrapper(self.pool.join)
1102 join()
Victor Stinner29943aa2011-03-24 16:24:07 +01001103 self.assertLess(join.elapsed, 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001104#
1105# Test that manager has expected number of shared objects left
1106#
1107
1108class _TestZZZNumberOfObjects(BaseTestCase):
1109 # Because test cases are sorted alphabetically, this one will get
1110 # run after all the other tests for the manager. It tests that
1111 # there have been no "reference leaks" for the manager's shared
1112 # objects. Note the comment in _TestPool.test_terminate().
1113 ALLOWED_TYPES = ('manager',)
1114
1115 def test_number_of_objects(self):
1116 EXPECTED_NUMBER = 1 # the pool object is still alive
1117 multiprocessing.active_children() # discard dead process objs
1118 gc.collect() # do garbage collection
1119 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001120 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001121 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001122 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001123 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001124
1125 self.assertEqual(refs, EXPECTED_NUMBER)
1126
1127#
1128# Test of creating a customized manager class
1129#
1130
1131from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1132
1133class FooBar(object):
1134 def f(self):
1135 return 'f()'
1136 def g(self):
1137 raise ValueError
1138 def _h(self):
1139 return '_h()'
1140
1141def baz():
1142 for i in range(10):
1143 yield i*i
1144
1145class IteratorProxy(BaseProxy):
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001146 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001147 def __iter__(self):
1148 return self
1149 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001150 return self._callmethod('__next__')
1151
1152class MyManager(BaseManager):
1153 pass
1154
1155MyManager.register('Foo', callable=FooBar)
1156MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1157MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1158
1159
1160class _TestMyManager(BaseTestCase):
1161
1162 ALLOWED_TYPES = ('manager',)
1163
1164 def test_mymanager(self):
1165 manager = MyManager()
1166 manager.start()
1167
1168 foo = manager.Foo()
1169 bar = manager.Bar()
1170 baz = manager.baz()
1171
1172 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1173 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1174
1175 self.assertEqual(foo_methods, ['f', 'g'])
1176 self.assertEqual(bar_methods, ['f', '_h'])
1177
1178 self.assertEqual(foo.f(), 'f()')
1179 self.assertRaises(ValueError, foo.g)
1180 self.assertEqual(foo._callmethod('f'), 'f()')
1181 self.assertRaises(RemoteError, foo._callmethod, '_h')
1182
1183 self.assertEqual(bar.f(), 'f()')
1184 self.assertEqual(bar._h(), '_h()')
1185 self.assertEqual(bar._callmethod('f'), 'f()')
1186 self.assertEqual(bar._callmethod('_h'), '_h()')
1187
1188 self.assertEqual(list(baz), [i*i for i in range(10)])
1189
1190 manager.shutdown()
1191
1192#
1193# Test of connecting to a remote server and using xmlrpclib for serialization
1194#
1195
1196_queue = pyqueue.Queue()
1197def get_queue():
1198 return _queue
1199
1200class QueueManager(BaseManager):
1201 '''manager class used by server process'''
1202QueueManager.register('get_queue', callable=get_queue)
1203
1204class QueueManager2(BaseManager):
1205 '''manager class which specifies the same interface as QueueManager'''
1206QueueManager2.register('get_queue')
1207
1208
1209SERIALIZER = 'xmlrpclib'
1210
1211class _TestRemoteManager(BaseTestCase):
1212
1213 ALLOWED_TYPES = ('manager',)
1214
Antoine Pitrou26899f42010-11-02 23:52:49 +00001215 @classmethod
1216 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001217 manager = QueueManager2(
1218 address=address, authkey=authkey, serializer=SERIALIZER
1219 )
1220 manager.connect()
1221 queue = manager.get_queue()
1222 queue.put(('hello world', None, True, 2.25))
1223
1224 def test_remote(self):
1225 authkey = os.urandom(32)
1226
1227 manager = QueueManager(
1228 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1229 )
1230 manager.start()
1231
1232 p = self.Process(target=self._putter, args=(manager.address, authkey))
1233 p.start()
1234
1235 manager2 = QueueManager2(
1236 address=manager.address, authkey=authkey, serializer=SERIALIZER
1237 )
1238 manager2.connect()
1239 queue = manager2.get_queue()
1240
1241 # Note that xmlrpclib will deserialize object as a list not a tuple
1242 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1243
1244 # Because we are using xmlrpclib for serialization instead of
1245 # pickle this will cause a serialization error.
1246 self.assertRaises(Exception, queue.put, time.sleep)
1247
1248 # Make queue finalizer run before the server is stopped
1249 del queue
1250 manager.shutdown()
1251
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001252class _TestManagerRestart(BaseTestCase):
1253
Antoine Pitrou26899f42010-11-02 23:52:49 +00001254 @classmethod
1255 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001256 manager = QueueManager(
1257 address=address, authkey=authkey, serializer=SERIALIZER)
1258 manager.connect()
1259 queue = manager.get_queue()
1260 queue.put('hello world')
1261
1262 def test_rapid_restart(self):
1263 authkey = os.urandom(32)
1264 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001265 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin9e2fadc2010-11-01 05:12:34 +00001266 srvr = manager.get_server()
1267 addr = srvr.address
1268 # Close the connection.Listener socket which gets opened as a part
1269 # of manager.get_server(). It's not needed for the test.
1270 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001271 manager.start()
1272
1273 p = self.Process(target=self._putter, args=(manager.address, authkey))
1274 p.start()
1275 queue = manager.get_queue()
1276 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001277 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001278 manager.shutdown()
1279 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001280 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001281 try:
1282 manager.start()
1283 except IOError as e:
1284 if e.errno != errno.EADDRINUSE:
1285 raise
1286 # Retry after some time, in case the old socket was lingering
1287 # (sporadic failure on buildbots)
1288 time.sleep(1.0)
1289 manager = QueueManager(
1290 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001291 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001292
Benjamin Petersone711caf2008-06-11 16:44:04 +00001293#
1294#
1295#
1296
1297SENTINEL = latin('')
1298
1299class _TestConnection(BaseTestCase):
1300
1301 ALLOWED_TYPES = ('processes', 'threads')
1302
Antoine Pitrou26899f42010-11-02 23:52:49 +00001303 @classmethod
1304 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001305 for msg in iter(conn.recv_bytes, SENTINEL):
1306 conn.send_bytes(msg)
1307 conn.close()
1308
1309 def test_connection(self):
1310 conn, child_conn = self.Pipe()
1311
1312 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001313 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001314 p.start()
1315
1316 seq = [1, 2.25, None]
1317 msg = latin('hello world')
1318 longmsg = msg * 10
1319 arr = array.array('i', list(range(4)))
1320
1321 if self.TYPE == 'processes':
1322 self.assertEqual(type(conn.fileno()), int)
1323
1324 self.assertEqual(conn.send(seq), None)
1325 self.assertEqual(conn.recv(), seq)
1326
1327 self.assertEqual(conn.send_bytes(msg), None)
1328 self.assertEqual(conn.recv_bytes(), msg)
1329
1330 if self.TYPE == 'processes':
1331 buffer = array.array('i', [0]*10)
1332 expected = list(arr) + [0] * (10 - len(arr))
1333 self.assertEqual(conn.send_bytes(arr), None)
1334 self.assertEqual(conn.recv_bytes_into(buffer),
1335 len(arr) * buffer.itemsize)
1336 self.assertEqual(list(buffer), expected)
1337
1338 buffer = array.array('i', [0]*10)
1339 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1340 self.assertEqual(conn.send_bytes(arr), None)
1341 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1342 len(arr) * buffer.itemsize)
1343 self.assertEqual(list(buffer), expected)
1344
1345 buffer = bytearray(latin(' ' * 40))
1346 self.assertEqual(conn.send_bytes(longmsg), None)
1347 try:
1348 res = conn.recv_bytes_into(buffer)
1349 except multiprocessing.BufferTooShort as e:
1350 self.assertEqual(e.args, (longmsg,))
1351 else:
1352 self.fail('expected BufferTooShort, got %s' % res)
1353
1354 poll = TimingWrapper(conn.poll)
1355
1356 self.assertEqual(poll(), False)
1357 self.assertTimingAlmostEqual(poll.elapsed, 0)
1358
1359 self.assertEqual(poll(TIMEOUT1), False)
1360 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1361
1362 conn.send(None)
1363
1364 self.assertEqual(poll(TIMEOUT1), True)
1365 self.assertTimingAlmostEqual(poll.elapsed, 0)
1366
1367 self.assertEqual(conn.recv(), None)
1368
1369 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1370 conn.send_bytes(really_big_msg)
1371 self.assertEqual(conn.recv_bytes(), really_big_msg)
1372
1373 conn.send_bytes(SENTINEL) # tell child to quit
1374 child_conn.close()
1375
1376 if self.TYPE == 'processes':
1377 self.assertEqual(conn.readable, True)
1378 self.assertEqual(conn.writable, True)
1379 self.assertRaises(EOFError, conn.recv)
1380 self.assertRaises(EOFError, conn.recv_bytes)
1381
1382 p.join()
1383
1384 def test_duplex_false(self):
1385 reader, writer = self.Pipe(duplex=False)
1386 self.assertEqual(writer.send(1), None)
1387 self.assertEqual(reader.recv(), 1)
1388 if self.TYPE == 'processes':
1389 self.assertEqual(reader.readable, True)
1390 self.assertEqual(reader.writable, False)
1391 self.assertEqual(writer.readable, False)
1392 self.assertEqual(writer.writable, True)
1393 self.assertRaises(IOError, reader.send, 2)
1394 self.assertRaises(IOError, writer.recv)
1395 self.assertRaises(IOError, writer.poll)
1396
1397 def test_spawn_close(self):
1398 # We test that a pipe connection can be closed by parent
1399 # process immediately after child is spawned. On Windows this
1400 # would have sometimes failed on old versions because
1401 # child_conn would be closed before the child got a chance to
1402 # duplicate it.
1403 conn, child_conn = self.Pipe()
1404
1405 p = self.Process(target=self._echo, args=(child_conn,))
1406 p.start()
1407 child_conn.close() # this might complete before child initializes
1408
1409 msg = latin('hello')
1410 conn.send_bytes(msg)
1411 self.assertEqual(conn.recv_bytes(), msg)
1412
1413 conn.send_bytes(SENTINEL)
1414 conn.close()
1415 p.join()
1416
1417 def test_sendbytes(self):
1418 if self.TYPE != 'processes':
1419 return
1420
1421 msg = latin('abcdefghijklmnopqrstuvwxyz')
1422 a, b = self.Pipe()
1423
1424 a.send_bytes(msg)
1425 self.assertEqual(b.recv_bytes(), msg)
1426
1427 a.send_bytes(msg, 5)
1428 self.assertEqual(b.recv_bytes(), msg[5:])
1429
1430 a.send_bytes(msg, 7, 8)
1431 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1432
1433 a.send_bytes(msg, 26)
1434 self.assertEqual(b.recv_bytes(), latin(''))
1435
1436 a.send_bytes(msg, 26, 0)
1437 self.assertEqual(b.recv_bytes(), latin(''))
1438
1439 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1440
1441 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1442
1443 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1444
1445 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1446
1447 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1448
Benjamin Petersone711caf2008-06-11 16:44:04 +00001449class _TestListenerClient(BaseTestCase):
1450
1451 ALLOWED_TYPES = ('processes', 'threads')
1452
Antoine Pitrou26899f42010-11-02 23:52:49 +00001453 @classmethod
1454 def _test(cls, address):
1455 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001456 conn.send('hello')
1457 conn.close()
1458
1459 def test_listener_client(self):
1460 for family in self.connection.families:
1461 l = self.connection.Listener(family=family)
1462 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001463 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001464 p.start()
1465 conn = l.accept()
1466 self.assertEqual(conn.recv(), 'hello')
1467 p.join()
1468 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001469#
1470# Test of sending connection and socket objects between processes
1471#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001472"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001473class _TestPicklingConnections(BaseTestCase):
1474
1475 ALLOWED_TYPES = ('processes',)
1476
1477 def _listener(self, conn, families):
1478 for fam in families:
1479 l = self.connection.Listener(family=fam)
1480 conn.send(l.address)
1481 new_conn = l.accept()
1482 conn.send(new_conn)
1483
1484 if self.TYPE == 'processes':
1485 l = socket.socket()
1486 l.bind(('localhost', 0))
1487 conn.send(l.getsockname())
1488 l.listen(1)
1489 new_conn, addr = l.accept()
1490 conn.send(new_conn)
1491
1492 conn.recv()
1493
1494 def _remote(self, conn):
1495 for (address, msg) in iter(conn.recv, None):
1496 client = self.connection.Client(address)
1497 client.send(msg.upper())
1498 client.close()
1499
1500 if self.TYPE == 'processes':
1501 address, msg = conn.recv()
1502 client = socket.socket()
1503 client.connect(address)
1504 client.sendall(msg.upper())
1505 client.close()
1506
1507 conn.close()
1508
1509 def test_pickling(self):
1510 try:
1511 multiprocessing.allow_connection_pickling()
1512 except ImportError:
1513 return
1514
1515 families = self.connection.families
1516
1517 lconn, lconn0 = self.Pipe()
1518 lp = self.Process(target=self._listener, args=(lconn0, families))
1519 lp.start()
1520 lconn0.close()
1521
1522 rconn, rconn0 = self.Pipe()
1523 rp = self.Process(target=self._remote, args=(rconn0,))
1524 rp.start()
1525 rconn0.close()
1526
1527 for fam in families:
1528 msg = ('This connection uses family %s' % fam).encode('ascii')
1529 address = lconn.recv()
1530 rconn.send((address, msg))
1531 new_conn = lconn.recv()
1532 self.assertEqual(new_conn.recv(), msg.upper())
1533
1534 rconn.send(None)
1535
1536 if self.TYPE == 'processes':
1537 msg = latin('This connection uses a normal socket')
1538 address = lconn.recv()
1539 rconn.send((address, msg))
1540 if hasattr(socket, 'fromfd'):
1541 new_conn = lconn.recv()
1542 self.assertEqual(new_conn.recv(100), msg.upper())
1543 else:
1544 # XXX On Windows with Py2.6 need to backport fromfd()
1545 discard = lconn.recv_bytes()
1546
1547 lconn.send(None)
1548
1549 rconn.close()
1550 lconn.close()
1551
1552 lp.join()
1553 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001554"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001555#
1556#
1557#
1558
1559class _TestHeap(BaseTestCase):
1560
1561 ALLOWED_TYPES = ('processes',)
1562
1563 def test_heap(self):
1564 iterations = 5000
1565 maxblocks = 50
1566 blocks = []
1567
1568 # create and destroy lots of blocks of different sizes
1569 for i in range(iterations):
1570 size = int(random.lognormvariate(0, 1) * 1000)
1571 b = multiprocessing.heap.BufferWrapper(size)
1572 blocks.append(b)
1573 if len(blocks) > maxblocks:
1574 i = random.randrange(maxblocks)
1575 del blocks[i]
1576
1577 # get the heap object
1578 heap = multiprocessing.heap.BufferWrapper._heap
1579
1580 # verify the state of the heap
1581 all = []
1582 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001583 heap._lock.acquire()
1584 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001585 for L in list(heap._len_to_seq.values()):
1586 for arena, start, stop in L:
1587 all.append((heap._arenas.index(arena), start, stop,
1588 stop-start, 'free'))
1589 for arena, start, stop in heap._allocated_blocks:
1590 all.append((heap._arenas.index(arena), start, stop,
1591 stop-start, 'occupied'))
1592 occupied += (stop-start)
1593
1594 all.sort()
1595
1596 for i in range(len(all)-1):
1597 (arena, start, stop) = all[i][:3]
1598 (narena, nstart, nstop) = all[i+1][:3]
1599 self.assertTrue((arena != narena and nstart == 0) or
1600 (stop == nstart))
1601
Charles-François Natali778db492011-07-02 14:35:49 +02001602 def test_free_from_gc(self):
1603 # Check that freeing of blocks by the garbage collector doesn't deadlock
1604 # (issue #12352).
1605 # Make sure the GC is enabled, and set lower collection thresholds to
1606 # make collections more frequent (and increase the probability of
1607 # deadlock).
1608 if not gc.isenabled():
1609 gc.enable()
1610 self.addCleanup(gc.disable)
1611 thresholds = gc.get_threshold()
1612 self.addCleanup(gc.set_threshold, *thresholds)
1613 gc.set_threshold(10)
1614
1615 # perform numerous block allocations, with cyclic references to make
1616 # sure objects are collected asynchronously by the gc
1617 for i in range(5000):
1618 a = multiprocessing.heap.BufferWrapper(1)
1619 b = multiprocessing.heap.BufferWrapper(1)
1620 # circular references
1621 a.buddy = b
1622 b.buddy = a
1623
Benjamin Petersone711caf2008-06-11 16:44:04 +00001624#
1625#
1626#
1627
Benjamin Petersone711caf2008-06-11 16:44:04 +00001628class _Foo(Structure):
1629 _fields_ = [
1630 ('x', c_int),
1631 ('y', c_double)
1632 ]
1633
1634class _TestSharedCTypes(BaseTestCase):
1635
1636 ALLOWED_TYPES = ('processes',)
1637
Antoine Pitrou72d5a9d2010-11-22 16:33:23 +00001638 def setUp(self):
1639 if not HAS_SHAREDCTYPES:
1640 self.skipTest("requires multiprocessing.sharedctypes")
1641
Antoine Pitrou26899f42010-11-02 23:52:49 +00001642 @classmethod
1643 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001644 x.value *= 2
1645 y.value *= 2
1646 foo.x *= 2
1647 foo.y *= 2
1648 string.value *= 2
1649 for i in range(len(arr)):
1650 arr[i] *= 2
1651
1652 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001653 x = Value('i', 7, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001654 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001655 foo = Value(_Foo, 3, 2, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001656 arr = self.Array('d', list(range(10)), lock=lock)
1657 string = self.Array('c', 20, lock=lock)
1658 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001659
1660 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1661 p.start()
1662 p.join()
1663
1664 self.assertEqual(x.value, 14)
1665 self.assertAlmostEqual(y.value, 2.0/3.0)
1666 self.assertEqual(foo.x, 6)
1667 self.assertAlmostEqual(foo.y, 4.0)
1668 for i in range(10):
1669 self.assertAlmostEqual(arr[i], i*2)
1670 self.assertEqual(string.value, latin('hellohello'))
1671
1672 def test_synchronize(self):
1673 self.test_sharedctypes(lock=True)
1674
1675 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001676 foo = _Foo(2, 5.0)
Brian Curtin918616c2010-10-07 02:12:17 +00001677 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001678 foo.x = 0
1679 foo.y = 0
1680 self.assertEqual(bar.x, 2)
1681 self.assertAlmostEqual(bar.y, 5.0)
1682
1683#
1684#
1685#
1686
1687class _TestFinalize(BaseTestCase):
1688
1689 ALLOWED_TYPES = ('processes',)
1690
Antoine Pitrou26899f42010-11-02 23:52:49 +00001691 @classmethod
1692 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001693 class Foo(object):
1694 pass
1695
1696 a = Foo()
1697 util.Finalize(a, conn.send, args=('a',))
1698 del a # triggers callback for a
1699
1700 b = Foo()
1701 close_b = util.Finalize(b, conn.send, args=('b',))
1702 close_b() # triggers callback for b
1703 close_b() # does nothing because callback has already been called
1704 del b # does nothing because callback has already been called
1705
1706 c = Foo()
1707 util.Finalize(c, conn.send, args=('c',))
1708
1709 d10 = Foo()
1710 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1711
1712 d01 = Foo()
1713 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1714 d02 = Foo()
1715 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1716 d03 = Foo()
1717 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1718
1719 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1720
1721 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1722
Ezio Melotti13925002011-03-16 11:05:33 +02001723 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001724 # garbage collecting locals
1725 util._exit_function()
1726 conn.close()
1727 os._exit(0)
1728
1729 def test_finalize(self):
1730 conn, child_conn = self.Pipe()
1731
1732 p = self.Process(target=self._test_finalize, args=(child_conn,))
1733 p.start()
1734 p.join()
1735
1736 result = [obj for obj in iter(conn.recv, 'STOP')]
1737 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1738
1739#
1740# Test that from ... import * works for each module
1741#
1742
1743class _TestImportStar(BaseTestCase):
1744
1745 ALLOWED_TYPES = ('processes',)
1746
1747 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001748 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001749 'multiprocessing', 'multiprocessing.connection',
1750 'multiprocessing.heap', 'multiprocessing.managers',
1751 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001752 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001753 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001754 ]
1755
1756 if c_int is not None:
1757 # This module requires _ctypes
1758 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001759
1760 for name in modules:
1761 __import__(name)
1762 mod = sys.modules[name]
1763
1764 for attr in getattr(mod, '__all__', ()):
1765 self.assertTrue(
1766 hasattr(mod, attr),
1767 '%r does not have attribute %r' % (mod, attr)
1768 )
1769
1770#
1771# Quick test that logging works -- does not test logging output
1772#
1773
1774class _TestLogging(BaseTestCase):
1775
1776 ALLOWED_TYPES = ('processes',)
1777
1778 def test_enable_logging(self):
1779 logger = multiprocessing.get_logger()
1780 logger.setLevel(util.SUBWARNING)
1781 self.assertTrue(logger is not None)
1782 logger.debug('this will not be printed')
1783 logger.info('nor will this')
1784 logger.setLevel(LOG_LEVEL)
1785
Antoine Pitrou26899f42010-11-02 23:52:49 +00001786 @classmethod
1787 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001788 logger = multiprocessing.get_logger()
1789 conn.send(logger.getEffectiveLevel())
1790
1791 def test_level(self):
1792 LEVEL1 = 32
1793 LEVEL2 = 37
1794
1795 logger = multiprocessing.get_logger()
1796 root_logger = logging.getLogger()
1797 root_level = root_logger.level
1798
1799 reader, writer = multiprocessing.Pipe(duplex=False)
1800
1801 logger.setLevel(LEVEL1)
1802 self.Process(target=self._test_level, args=(writer,)).start()
1803 self.assertEqual(LEVEL1, reader.recv())
1804
1805 logger.setLevel(logging.NOTSET)
1806 root_logger.setLevel(LEVEL2)
1807 self.Process(target=self._test_level, args=(writer,)).start()
1808 self.assertEqual(LEVEL2, reader.recv())
1809
1810 root_logger.setLevel(root_level)
1811 logger.setLevel(level=LOG_LEVEL)
1812
1813#
Jesse Noller6214edd2009-01-19 16:23:53 +00001814# Test to verify handle verification, see issue 3321
1815#
1816
1817class TestInvalidHandle(unittest.TestCase):
1818
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001819 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001820 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001821 conn = _multiprocessing.Connection(44977608)
1822 self.assertRaises(IOError, conn.poll)
1823 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001824
Jesse Noller6214edd2009-01-19 16:23:53 +00001825#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001826# Functions used to create test cases from the base ones in this module
1827#
1828
1829def get_attributes(Source, names):
1830 d = {}
1831 for name in names:
1832 obj = getattr(Source, name)
1833 if type(obj) == type(get_attributes):
1834 obj = staticmethod(obj)
1835 d[name] = obj
1836 return d
1837
1838def create_test_cases(Mixin, type):
1839 result = {}
1840 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001841 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001842
1843 for name in list(glob.keys()):
1844 if name.startswith('_Test'):
1845 base = glob[name]
1846 if type in base.ALLOWED_TYPES:
1847 newname = 'With' + Type + name[1:]
1848 class Temp(base, unittest.TestCase, Mixin):
1849 pass
1850 result[newname] = Temp
1851 Temp.__name__ = newname
1852 Temp.__module__ = Mixin.__module__
1853 return result
1854
1855#
1856# Create test cases
1857#
1858
1859class ProcessesMixin(object):
1860 TYPE = 'processes'
1861 Process = multiprocessing.Process
1862 locals().update(get_attributes(multiprocessing, (
1863 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1864 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1865 'RawArray', 'current_process', 'active_children', 'Pipe',
1866 'connection', 'JoinableQueue'
1867 )))
1868
1869testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1870globals().update(testcases_processes)
1871
1872
1873class ManagerMixin(object):
1874 TYPE = 'manager'
1875 Process = multiprocessing.Process
1876 manager = object.__new__(multiprocessing.managers.SyncManager)
1877 locals().update(get_attributes(manager, (
1878 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1879 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1880 'Namespace', 'JoinableQueue'
1881 )))
1882
1883testcases_manager = create_test_cases(ManagerMixin, type='manager')
1884globals().update(testcases_manager)
1885
1886
1887class ThreadsMixin(object):
1888 TYPE = 'threads'
1889 Process = multiprocessing.dummy.Process
1890 locals().update(get_attributes(multiprocessing.dummy, (
1891 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1892 'Condition', 'Event', 'Value', 'Array', 'current_process',
1893 'active_children', 'Pipe', 'connection', 'dict', 'list',
1894 'Namespace', 'JoinableQueue'
1895 )))
1896
1897testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1898globals().update(testcases_threads)
1899
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001900class OtherTest(unittest.TestCase):
1901 # TODO: add more tests for deliver/answer challenge.
1902 def test_deliver_challenge_auth_failure(self):
1903 class _FakeConnection(object):
1904 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001905 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001906 def send_bytes(self, data):
1907 pass
1908 self.assertRaises(multiprocessing.AuthenticationError,
1909 multiprocessing.connection.deliver_challenge,
1910 _FakeConnection(), b'abc')
1911
1912 def test_answer_challenge_auth_failure(self):
1913 class _FakeConnection(object):
1914 def __init__(self):
1915 self.count = 0
1916 def recv_bytes(self, size):
1917 self.count += 1
1918 if self.count == 1:
1919 return multiprocessing.connection.CHALLENGE
1920 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001921 return b'something bogus'
1922 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001923 def send_bytes(self, data):
1924 pass
1925 self.assertRaises(multiprocessing.AuthenticationError,
1926 multiprocessing.connection.answer_challenge,
1927 _FakeConnection(), b'abc')
1928
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001929#
1930# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1931#
1932
1933def initializer(ns):
1934 ns.test += 1
1935
1936class TestInitializers(unittest.TestCase):
1937 def setUp(self):
1938 self.mgr = multiprocessing.Manager()
1939 self.ns = self.mgr.Namespace()
1940 self.ns.test = 0
1941
1942 def tearDown(self):
1943 self.mgr.shutdown()
1944
1945 def test_manager_initializer(self):
1946 m = multiprocessing.managers.SyncManager()
1947 self.assertRaises(TypeError, m.start, 1)
1948 m.start(initializer, (self.ns,))
1949 self.assertEqual(self.ns.test, 1)
1950 m.shutdown()
1951
1952 def test_pool_initializer(self):
1953 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1954 p = multiprocessing.Pool(1, initializer, (self.ns,))
1955 p.close()
1956 p.join()
1957 self.assertEqual(self.ns.test, 1)
1958
R. David Murraya44c6b32009-07-29 15:40:30 +00001959#
1960# Issue 5155, 5313, 5331: Test process in processes
1961# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1962#
1963
1964def _ThisSubProcess(q):
1965 try:
1966 item = q.get(block=False)
1967 except pyqueue.Empty:
1968 pass
1969
1970def _TestProcess(q):
1971 queue = multiprocessing.Queue()
1972 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1973 subProc.start()
1974 subProc.join()
1975
1976def _afunc(x):
1977 return x*x
1978
1979def pool_in_process():
1980 pool = multiprocessing.Pool(processes=4)
1981 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1982
1983class _file_like(object):
1984 def __init__(self, delegate):
1985 self._delegate = delegate
1986 self._pid = None
1987
1988 @property
1989 def cache(self):
1990 pid = os.getpid()
1991 # There are no race conditions since fork keeps only the running thread
1992 if pid != self._pid:
1993 self._pid = pid
1994 self._cache = []
1995 return self._cache
1996
1997 def write(self, data):
1998 self.cache.append(data)
1999
2000 def flush(self):
2001 self._delegate.write(''.join(self.cache))
2002 self._cache = []
2003
2004class TestStdinBadfiledescriptor(unittest.TestCase):
2005
2006 def test_queue_in_process(self):
2007 queue = multiprocessing.Queue()
2008 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2009 proc.start()
2010 proc.join()
2011
2012 def test_pool_in_process(self):
2013 p = multiprocessing.Process(target=pool_in_process)
2014 p.start()
2015 p.join()
2016
2017 def test_flushing(self):
2018 sio = io.StringIO()
2019 flike = _file_like(sio)
2020 flike.write('foo')
2021 proc = multiprocessing.Process(target=lambda: flike.flush())
2022 flike.flush()
2023 assert sio.getvalue() == 'foo'
2024
2025testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2026 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002027
Benjamin Petersone711caf2008-06-11 16:44:04 +00002028#
2029#
2030#
2031
2032def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002033 if sys.platform.startswith("linux"):
2034 try:
2035 lock = multiprocessing.RLock()
2036 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002037 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002038
Benjamin Petersone711caf2008-06-11 16:44:04 +00002039 if run is None:
2040 from test.support import run_unittest as run
2041
2042 util.get_temp_dir() # creates temp directory for use by all processes
2043
2044 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2045
Benjamin Peterson41181742008-07-02 20:22:54 +00002046 ProcessesMixin.pool = multiprocessing.Pool(4)
2047 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2048 ManagerMixin.manager.__init__()
2049 ManagerMixin.manager.start()
2050 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002051
2052 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002053 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2054 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002055 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2056 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002057 )
2058
2059 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2060 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2061 run(suite)
2062
Benjamin Peterson41181742008-07-02 20:22:54 +00002063 ThreadsMixin.pool.terminate()
2064 ProcessesMixin.pool.terminate()
2065 ManagerMixin.pool.terminate()
2066 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002067
Benjamin Peterson41181742008-07-02 20:22:54 +00002068 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002069
2070def main():
2071 test_main(unittest.TextTestRunner(verbosity=2).run)
2072
2073if __name__ == '__main__':
2074 main()