blob: a001e935c727563401bfa585113e3ad6b4ac219b [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000033
34from multiprocessing import util
35
Brian Curtin918616c2010-10-07 02:12:17 +000036try:
37 from multiprocessing.sharedctypes import Value, copy
38 HAS_SHAREDCTYPES = True
39except ImportError:
40 HAS_SHAREDCTYPES = False
41
Benjamin Petersone711caf2008-06-11 16:44:04 +000042#
43#
44#
45
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000046def latin(s):
47 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
Benjamin Petersone711caf2008-06-11 16:44:04 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
54#LOG_LEVEL = logging.WARNING
55
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller6214edd2009-01-19 16:23:53 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersone711caf2008-06-11 16:44:04 +000071#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000072# Some tests require ctypes
73#
74
75try:
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna9b0e9182010-03-28 11:42:38 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersone711caf2008-06-11 16:44:04 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou26899f42010-11-02 23:52:49 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersone711caf2008-06-11 16:44:04 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000154 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155
156 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 self.assertTrue(isinstance(authkey, bytes))
159 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162
Antoine Pitrou26899f42010-11-02 23:52:49 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 q.put(args)
167 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 q.put(current.name)
Antoine Pitrou26899f42010-11-02 23:52:49 +0000169 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000170 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000182 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000186 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000188 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 self.assertTrue(p not in self.active_children())
190 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192
193 p.start()
194
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000195 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000196 self.assertEquals(p.is_alive(), True)
197 self.assertTrue(p in self.active_children())
198
199 self.assertEquals(q.get(), args[1:])
200 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self.assertEquals(q.get(), p.pid)
205
206 p.join()
207
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209 self.assertEquals(p.is_alive(), False)
210 self.assertTrue(p not in self.active_children())
211
Antoine Pitrou26899f42010-11-02 23:52:49 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000221 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
225 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
235 self.assertTrue(p not in self.active_children())
236
237 p.join()
238
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
254 self.assertTrue(p not in self.active_children())
255
256 p.start()
257 self.assertTrue(p in self.active_children())
258
259 p.join()
260 self.assertTrue(p not in self.active_children())
261
Antoine Pitrou26899f42010-11-02 23:52:49 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou26899f42010-11-02 23:52:49 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou26899f42010-11-02 23:52:49 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000369 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(pyqueue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(pyqueue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(pyqueue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou26899f42010-11-02 23:52:49 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000419 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000420 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000436 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(pyqueue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(pyqueue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou26899f42010-11-02 23:52:49 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(pyqueue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou26899f42010-11-02 23:52:49 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in range(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in range(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Nollerf8d00852009-03-31 03:25:07 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersone711caf2008-06-11 16:44:04 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou26899f42010-11-02 23:52:49 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000666 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000670 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000712 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000717 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in range(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in range(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000735 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000739 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in range(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou26899f42010-11-02 23:52:49 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
783 # Removed temporaily, due to API shear, this does not
784 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786
Benjamin Peterson965ce872009-04-05 21:24:58 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000809 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810
811#
812#
813#
814
Brian Curtin918616c2010-10-07 02:12:17 +0000815@unittest.skipUnless(HAS_SHAREDCTYPES,
816 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000817class _TestValue(BaseTestCase):
818
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000819 ALLOWED_TYPES = ('processes',)
820
Benjamin Petersone711caf2008-06-11 16:44:04 +0000821 codes_values = [
822 ('i', 4343, 24234),
823 ('d', 3.625, -4.25),
824 ('h', -232, 234),
825 ('c', latin('x'), latin('y'))
826 ]
827
Antoine Pitrou26899f42010-11-02 23:52:49 +0000828 @classmethod
829 def _test(cls, values):
830 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 sv.value = cv[2]
832
833
834 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 if raw:
836 values = [self.RawValue(code, value)
837 for code, value, _ in self.codes_values]
838 else:
839 values = [self.Value(code, value)
840 for code, value, _ in self.codes_values]
841
842 for sv, cv in zip(values, self.codes_values):
843 self.assertEqual(sv.value, cv[1])
844
845 proc = self.Process(target=self._test, args=(values,))
846 proc.start()
847 proc.join()
848
849 for sv, cv in zip(values, self.codes_values):
850 self.assertEqual(sv.value, cv[2])
851
852 def test_rawvalue(self):
853 self.test_value(raw=True)
854
855 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000856 val1 = self.Value('i', 5)
857 lock1 = val1.get_lock()
858 obj1 = val1.get_obj()
859
860 val2 = self.Value('i', 5, lock=None)
861 lock2 = val2.get_lock()
862 obj2 = val2.get_obj()
863
864 lock = self.Lock()
865 val3 = self.Value('i', 5, lock=lock)
866 lock3 = val3.get_lock()
867 obj3 = val3.get_obj()
868 self.assertEqual(lock, lock3)
869
Jesse Nollerb0516a62009-01-18 03:11:38 +0000870 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871 self.assertFalse(hasattr(arr4, 'get_lock'))
872 self.assertFalse(hasattr(arr4, 'get_obj'))
873
Jesse Nollerb0516a62009-01-18 03:11:38 +0000874 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
875
876 arr5 = self.RawValue('i', 5)
877 self.assertFalse(hasattr(arr5, 'get_lock'))
878 self.assertFalse(hasattr(arr5, 'get_obj'))
879
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880
881class _TestArray(BaseTestCase):
882
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000883 ALLOWED_TYPES = ('processes',)
884
Antoine Pitrou26899f42010-11-02 23:52:49 +0000885 @classmethod
886 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000887 for i in range(1, len(seq)):
888 seq[i] += seq[i-1]
889
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000890 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
893 if raw:
894 arr = self.RawArray('i', seq)
895 else:
896 arr = self.Array('i', seq)
897
898 self.assertEqual(len(arr), len(seq))
899 self.assertEqual(arr[3], seq[3])
900 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
901
902 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
903
904 self.assertEqual(list(arr[:]), seq)
905
906 self.f(seq)
907
908 p = self.Process(target=self.f, args=(arr,))
909 p.start()
910 p.join()
911
912 self.assertEqual(list(arr[:]), seq)
913
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000914 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000915 def test_rawarray(self):
916 self.test_array(raw=True)
917
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000918 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000919 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 arr1 = self.Array('i', list(range(10)))
921 lock1 = arr1.get_lock()
922 obj1 = arr1.get_obj()
923
924 arr2 = self.Array('i', list(range(10)), lock=None)
925 lock2 = arr2.get_lock()
926 obj2 = arr2.get_obj()
927
928 lock = self.Lock()
929 arr3 = self.Array('i', list(range(10)), lock=lock)
930 lock3 = arr3.get_lock()
931 obj3 = arr3.get_obj()
932 self.assertEqual(lock, lock3)
933
Jesse Nollerb0516a62009-01-18 03:11:38 +0000934 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000935 self.assertFalse(hasattr(arr4, 'get_lock'))
936 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000937 self.assertRaises(AttributeError,
938 self.Array, 'i', range(10), lock='notalock')
939
940 arr5 = self.RawArray('i', range(10))
941 self.assertFalse(hasattr(arr5, 'get_lock'))
942 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000943
944#
945#
946#
947
948class _TestContainers(BaseTestCase):
949
950 ALLOWED_TYPES = ('manager',)
951
952 def test_list(self):
953 a = self.list(list(range(10)))
954 self.assertEqual(a[:], list(range(10)))
955
956 b = self.list()
957 self.assertEqual(b[:], [])
958
959 b.extend(list(range(5)))
960 self.assertEqual(b[:], list(range(5)))
961
962 self.assertEqual(b[2], 2)
963 self.assertEqual(b[2:10], [2,3,4])
964
965 b *= 2
966 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
967
968 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
969
970 self.assertEqual(a[:], list(range(10)))
971
972 d = [a, b]
973 e = self.list(d)
974 self.assertEqual(
975 e[:],
976 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
977 )
978
979 f = self.list([a])
980 a.append('hello')
981 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
982
983 def test_dict(self):
984 d = self.dict()
985 indices = list(range(65, 70))
986 for i in indices:
987 d[i] = chr(i)
988 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
989 self.assertEqual(sorted(d.keys()), indices)
990 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
991 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
992
993 def test_namespace(self):
994 n = self.Namespace()
995 n.name = 'Bob'
996 n.job = 'Builder'
997 n._hidden = 'hidden'
998 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
999 del n.job
1000 self.assertEqual(str(n), "Namespace(name='Bob')")
1001 self.assertTrue(hasattr(n, 'name'))
1002 self.assertTrue(not hasattr(n, 'job'))
1003
1004#
1005#
1006#
1007
1008def sqr(x, wait=0.0):
1009 time.sleep(wait)
1010 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +00001011class _TestPool(BaseTestCase):
1012
1013 def test_apply(self):
1014 papply = self.pool.apply
1015 self.assertEqual(papply(sqr, (5,)), sqr(5))
1016 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1017
1018 def test_map(self):
1019 pmap = self.pool.map
1020 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1021 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1022 list(map(sqr, list(range(100)))))
1023
Georg Brandld80344f2009-08-13 12:26:19 +00001024 def test_map_chunksize(self):
1025 try:
1026 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1027 except multiprocessing.TimeoutError:
1028 self.fail("pool.map_async with chunksize stalled on null list")
1029
Benjamin Petersone711caf2008-06-11 16:44:04 +00001030 def test_async(self):
1031 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1032 get = TimingWrapper(res.get)
1033 self.assertEqual(get(), 49)
1034 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1035
1036 def test_async_timeout(self):
1037 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1038 get = TimingWrapper(res.get)
1039 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1040 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1041
1042 def test_imap(self):
1043 it = self.pool.imap(sqr, list(range(10)))
1044 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1045
1046 it = self.pool.imap(sqr, list(range(10)))
1047 for i in range(10):
1048 self.assertEqual(next(it), i*i)
1049 self.assertRaises(StopIteration, it.__next__)
1050
1051 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1052 for i in range(1000):
1053 self.assertEqual(next(it), i*i)
1054 self.assertRaises(StopIteration, it.__next__)
1055
1056 def test_imap_unordered(self):
1057 it = self.pool.imap_unordered(sqr, list(range(1000)))
1058 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1059
1060 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1061 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1062
1063 def test_make_pool(self):
1064 p = multiprocessing.Pool(3)
1065 self.assertEqual(3, len(p._pool))
1066 p.close()
1067 p.join()
1068
1069 def test_terminate(self):
1070 if self.TYPE == 'manager':
1071 # On Unix a forked process increfs each shared object to
1072 # which its parent process held a reference. If the
1073 # forked process gets terminated then there is likely to
1074 # be a reference leak. So to prevent
1075 # _TestZZZNumberOfObjects from failing we skip this test
1076 # when using a manager.
1077 return
1078
1079 result = self.pool.map_async(
1080 time.sleep, [0.1 for i in range(10000)], chunksize=1
1081 )
1082 self.pool.terminate()
1083 join = TimingWrapper(self.pool.join)
1084 join()
1085 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001086#
1087# Test that manager has expected number of shared objects left
1088#
1089
1090class _TestZZZNumberOfObjects(BaseTestCase):
1091 # Because test cases are sorted alphabetically, this one will get
1092 # run after all the other tests for the manager. It tests that
1093 # there have been no "reference leaks" for the manager's shared
1094 # objects. Note the comment in _TestPool.test_terminate().
1095 ALLOWED_TYPES = ('manager',)
1096
1097 def test_number_of_objects(self):
1098 EXPECTED_NUMBER = 1 # the pool object is still alive
1099 multiprocessing.active_children() # discard dead process objs
1100 gc.collect() # do garbage collection
1101 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001102 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001103 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001104 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001105 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001106
1107 self.assertEqual(refs, EXPECTED_NUMBER)
1108
1109#
1110# Test of creating a customized manager class
1111#
1112
1113from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1114
1115class FooBar(object):
1116 def f(self):
1117 return 'f()'
1118 def g(self):
1119 raise ValueError
1120 def _h(self):
1121 return '_h()'
1122
1123def baz():
1124 for i in range(10):
1125 yield i*i
1126
1127class IteratorProxy(BaseProxy):
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001128 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001129 def __iter__(self):
1130 return self
1131 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001132 return self._callmethod('__next__')
1133
1134class MyManager(BaseManager):
1135 pass
1136
1137MyManager.register('Foo', callable=FooBar)
1138MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1139MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1140
1141
1142class _TestMyManager(BaseTestCase):
1143
1144 ALLOWED_TYPES = ('manager',)
1145
1146 def test_mymanager(self):
1147 manager = MyManager()
1148 manager.start()
1149
1150 foo = manager.Foo()
1151 bar = manager.Bar()
1152 baz = manager.baz()
1153
1154 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1155 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1156
1157 self.assertEqual(foo_methods, ['f', 'g'])
1158 self.assertEqual(bar_methods, ['f', '_h'])
1159
1160 self.assertEqual(foo.f(), 'f()')
1161 self.assertRaises(ValueError, foo.g)
1162 self.assertEqual(foo._callmethod('f'), 'f()')
1163 self.assertRaises(RemoteError, foo._callmethod, '_h')
1164
1165 self.assertEqual(bar.f(), 'f()')
1166 self.assertEqual(bar._h(), '_h()')
1167 self.assertEqual(bar._callmethod('f'), 'f()')
1168 self.assertEqual(bar._callmethod('_h'), '_h()')
1169
1170 self.assertEqual(list(baz), [i*i for i in range(10)])
1171
1172 manager.shutdown()
1173
1174#
1175# Test of connecting to a remote server and using xmlrpclib for serialization
1176#
1177
1178_queue = pyqueue.Queue()
1179def get_queue():
1180 return _queue
1181
1182class QueueManager(BaseManager):
1183 '''manager class used by server process'''
1184QueueManager.register('get_queue', callable=get_queue)
1185
1186class QueueManager2(BaseManager):
1187 '''manager class which specifies the same interface as QueueManager'''
1188QueueManager2.register('get_queue')
1189
1190
1191SERIALIZER = 'xmlrpclib'
1192
1193class _TestRemoteManager(BaseTestCase):
1194
1195 ALLOWED_TYPES = ('manager',)
1196
Antoine Pitrou26899f42010-11-02 23:52:49 +00001197 @classmethod
1198 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001199 manager = QueueManager2(
1200 address=address, authkey=authkey, serializer=SERIALIZER
1201 )
1202 manager.connect()
1203 queue = manager.get_queue()
1204 queue.put(('hello world', None, True, 2.25))
1205
1206 def test_remote(self):
1207 authkey = os.urandom(32)
1208
1209 manager = QueueManager(
1210 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1211 )
1212 manager.start()
1213
1214 p = self.Process(target=self._putter, args=(manager.address, authkey))
1215 p.start()
1216
1217 manager2 = QueueManager2(
1218 address=manager.address, authkey=authkey, serializer=SERIALIZER
1219 )
1220 manager2.connect()
1221 queue = manager2.get_queue()
1222
1223 # Note that xmlrpclib will deserialize object as a list not a tuple
1224 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1225
1226 # Because we are using xmlrpclib for serialization instead of
1227 # pickle this will cause a serialization error.
1228 self.assertRaises(Exception, queue.put, time.sleep)
1229
1230 # Make queue finalizer run before the server is stopped
1231 del queue
1232 manager.shutdown()
1233
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001234class _TestManagerRestart(BaseTestCase):
1235
Antoine Pitrou26899f42010-11-02 23:52:49 +00001236 @classmethod
1237 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001238 manager = QueueManager(
1239 address=address, authkey=authkey, serializer=SERIALIZER)
1240 manager.connect()
1241 queue = manager.get_queue()
1242 queue.put('hello world')
1243
1244 def test_rapid_restart(self):
1245 authkey = os.urandom(32)
1246 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001247 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin9e2fadc2010-11-01 05:12:34 +00001248 srvr = manager.get_server()
1249 addr = srvr.address
1250 # Close the connection.Listener socket which gets opened as a part
1251 # of manager.get_server(). It's not needed for the test.
1252 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001253 manager.start()
1254
1255 p = self.Process(target=self._putter, args=(manager.address, authkey))
1256 p.start()
1257 queue = manager.get_queue()
1258 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001259 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001260 manager.shutdown()
1261 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001262 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001263 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001264 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001265
Benjamin Petersone711caf2008-06-11 16:44:04 +00001266#
1267#
1268#
1269
1270SENTINEL = latin('')
1271
1272class _TestConnection(BaseTestCase):
1273
1274 ALLOWED_TYPES = ('processes', 'threads')
1275
Antoine Pitrou26899f42010-11-02 23:52:49 +00001276 @classmethod
1277 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001278 for msg in iter(conn.recv_bytes, SENTINEL):
1279 conn.send_bytes(msg)
1280 conn.close()
1281
1282 def test_connection(self):
1283 conn, child_conn = self.Pipe()
1284
1285 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001286 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001287 p.start()
1288
1289 seq = [1, 2.25, None]
1290 msg = latin('hello world')
1291 longmsg = msg * 10
1292 arr = array.array('i', list(range(4)))
1293
1294 if self.TYPE == 'processes':
1295 self.assertEqual(type(conn.fileno()), int)
1296
1297 self.assertEqual(conn.send(seq), None)
1298 self.assertEqual(conn.recv(), seq)
1299
1300 self.assertEqual(conn.send_bytes(msg), None)
1301 self.assertEqual(conn.recv_bytes(), msg)
1302
1303 if self.TYPE == 'processes':
1304 buffer = array.array('i', [0]*10)
1305 expected = list(arr) + [0] * (10 - len(arr))
1306 self.assertEqual(conn.send_bytes(arr), None)
1307 self.assertEqual(conn.recv_bytes_into(buffer),
1308 len(arr) * buffer.itemsize)
1309 self.assertEqual(list(buffer), expected)
1310
1311 buffer = array.array('i', [0]*10)
1312 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1313 self.assertEqual(conn.send_bytes(arr), None)
1314 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1315 len(arr) * buffer.itemsize)
1316 self.assertEqual(list(buffer), expected)
1317
1318 buffer = bytearray(latin(' ' * 40))
1319 self.assertEqual(conn.send_bytes(longmsg), None)
1320 try:
1321 res = conn.recv_bytes_into(buffer)
1322 except multiprocessing.BufferTooShort as e:
1323 self.assertEqual(e.args, (longmsg,))
1324 else:
1325 self.fail('expected BufferTooShort, got %s' % res)
1326
1327 poll = TimingWrapper(conn.poll)
1328
1329 self.assertEqual(poll(), False)
1330 self.assertTimingAlmostEqual(poll.elapsed, 0)
1331
1332 self.assertEqual(poll(TIMEOUT1), False)
1333 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1334
1335 conn.send(None)
1336
1337 self.assertEqual(poll(TIMEOUT1), True)
1338 self.assertTimingAlmostEqual(poll.elapsed, 0)
1339
1340 self.assertEqual(conn.recv(), None)
1341
1342 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1343 conn.send_bytes(really_big_msg)
1344 self.assertEqual(conn.recv_bytes(), really_big_msg)
1345
1346 conn.send_bytes(SENTINEL) # tell child to quit
1347 child_conn.close()
1348
1349 if self.TYPE == 'processes':
1350 self.assertEqual(conn.readable, True)
1351 self.assertEqual(conn.writable, True)
1352 self.assertRaises(EOFError, conn.recv)
1353 self.assertRaises(EOFError, conn.recv_bytes)
1354
1355 p.join()
1356
1357 def test_duplex_false(self):
1358 reader, writer = self.Pipe(duplex=False)
1359 self.assertEqual(writer.send(1), None)
1360 self.assertEqual(reader.recv(), 1)
1361 if self.TYPE == 'processes':
1362 self.assertEqual(reader.readable, True)
1363 self.assertEqual(reader.writable, False)
1364 self.assertEqual(writer.readable, False)
1365 self.assertEqual(writer.writable, True)
1366 self.assertRaises(IOError, reader.send, 2)
1367 self.assertRaises(IOError, writer.recv)
1368 self.assertRaises(IOError, writer.poll)
1369
1370 def test_spawn_close(self):
1371 # We test that a pipe connection can be closed by parent
1372 # process immediately after child is spawned. On Windows this
1373 # would have sometimes failed on old versions because
1374 # child_conn would be closed before the child got a chance to
1375 # duplicate it.
1376 conn, child_conn = self.Pipe()
1377
1378 p = self.Process(target=self._echo, args=(child_conn,))
1379 p.start()
1380 child_conn.close() # this might complete before child initializes
1381
1382 msg = latin('hello')
1383 conn.send_bytes(msg)
1384 self.assertEqual(conn.recv_bytes(), msg)
1385
1386 conn.send_bytes(SENTINEL)
1387 conn.close()
1388 p.join()
1389
1390 def test_sendbytes(self):
1391 if self.TYPE != 'processes':
1392 return
1393
1394 msg = latin('abcdefghijklmnopqrstuvwxyz')
1395 a, b = self.Pipe()
1396
1397 a.send_bytes(msg)
1398 self.assertEqual(b.recv_bytes(), msg)
1399
1400 a.send_bytes(msg, 5)
1401 self.assertEqual(b.recv_bytes(), msg[5:])
1402
1403 a.send_bytes(msg, 7, 8)
1404 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1405
1406 a.send_bytes(msg, 26)
1407 self.assertEqual(b.recv_bytes(), latin(''))
1408
1409 a.send_bytes(msg, 26, 0)
1410 self.assertEqual(b.recv_bytes(), latin(''))
1411
1412 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1413
1414 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1415
1416 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1417
1418 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1419
1420 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1421
Benjamin Petersone711caf2008-06-11 16:44:04 +00001422class _TestListenerClient(BaseTestCase):
1423
1424 ALLOWED_TYPES = ('processes', 'threads')
1425
Antoine Pitrou26899f42010-11-02 23:52:49 +00001426 @classmethod
1427 def _test(cls, address):
1428 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001429 conn.send('hello')
1430 conn.close()
1431
1432 def test_listener_client(self):
1433 for family in self.connection.families:
1434 l = self.connection.Listener(family=family)
1435 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001436 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001437 p.start()
1438 conn = l.accept()
1439 self.assertEqual(conn.recv(), 'hello')
1440 p.join()
1441 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001442#
1443# Test of sending connection and socket objects between processes
1444#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001445"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001446class _TestPicklingConnections(BaseTestCase):
1447
1448 ALLOWED_TYPES = ('processes',)
1449
1450 def _listener(self, conn, families):
1451 for fam in families:
1452 l = self.connection.Listener(family=fam)
1453 conn.send(l.address)
1454 new_conn = l.accept()
1455 conn.send(new_conn)
1456
1457 if self.TYPE == 'processes':
1458 l = socket.socket()
1459 l.bind(('localhost', 0))
1460 conn.send(l.getsockname())
1461 l.listen(1)
1462 new_conn, addr = l.accept()
1463 conn.send(new_conn)
1464
1465 conn.recv()
1466
1467 def _remote(self, conn):
1468 for (address, msg) in iter(conn.recv, None):
1469 client = self.connection.Client(address)
1470 client.send(msg.upper())
1471 client.close()
1472
1473 if self.TYPE == 'processes':
1474 address, msg = conn.recv()
1475 client = socket.socket()
1476 client.connect(address)
1477 client.sendall(msg.upper())
1478 client.close()
1479
1480 conn.close()
1481
1482 def test_pickling(self):
1483 try:
1484 multiprocessing.allow_connection_pickling()
1485 except ImportError:
1486 return
1487
1488 families = self.connection.families
1489
1490 lconn, lconn0 = self.Pipe()
1491 lp = self.Process(target=self._listener, args=(lconn0, families))
1492 lp.start()
1493 lconn0.close()
1494
1495 rconn, rconn0 = self.Pipe()
1496 rp = self.Process(target=self._remote, args=(rconn0,))
1497 rp.start()
1498 rconn0.close()
1499
1500 for fam in families:
1501 msg = ('This connection uses family %s' % fam).encode('ascii')
1502 address = lconn.recv()
1503 rconn.send((address, msg))
1504 new_conn = lconn.recv()
1505 self.assertEqual(new_conn.recv(), msg.upper())
1506
1507 rconn.send(None)
1508
1509 if self.TYPE == 'processes':
1510 msg = latin('This connection uses a normal socket')
1511 address = lconn.recv()
1512 rconn.send((address, msg))
1513 if hasattr(socket, 'fromfd'):
1514 new_conn = lconn.recv()
1515 self.assertEqual(new_conn.recv(100), msg.upper())
1516 else:
1517 # XXX On Windows with Py2.6 need to backport fromfd()
1518 discard = lconn.recv_bytes()
1519
1520 lconn.send(None)
1521
1522 rconn.close()
1523 lconn.close()
1524
1525 lp.join()
1526 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001527"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001528#
1529#
1530#
1531
1532class _TestHeap(BaseTestCase):
1533
1534 ALLOWED_TYPES = ('processes',)
1535
1536 def test_heap(self):
1537 iterations = 5000
1538 maxblocks = 50
1539 blocks = []
1540
1541 # create and destroy lots of blocks of different sizes
1542 for i in range(iterations):
1543 size = int(random.lognormvariate(0, 1) * 1000)
1544 b = multiprocessing.heap.BufferWrapper(size)
1545 blocks.append(b)
1546 if len(blocks) > maxblocks:
1547 i = random.randrange(maxblocks)
1548 del blocks[i]
1549
1550 # get the heap object
1551 heap = multiprocessing.heap.BufferWrapper._heap
1552
1553 # verify the state of the heap
1554 all = []
1555 occupied = 0
1556 for L in list(heap._len_to_seq.values()):
1557 for arena, start, stop in L:
1558 all.append((heap._arenas.index(arena), start, stop,
1559 stop-start, 'free'))
1560 for arena, start, stop in heap._allocated_blocks:
1561 all.append((heap._arenas.index(arena), start, stop,
1562 stop-start, 'occupied'))
1563 occupied += (stop-start)
1564
1565 all.sort()
1566
1567 for i in range(len(all)-1):
1568 (arena, start, stop) = all[i][:3]
1569 (narena, nstart, nstop) = all[i+1][:3]
1570 self.assertTrue((arena != narena and nstart == 0) or
1571 (stop == nstart))
1572
1573#
1574#
1575#
1576
Benjamin Petersone711caf2008-06-11 16:44:04 +00001577class _Foo(Structure):
1578 _fields_ = [
1579 ('x', c_int),
1580 ('y', c_double)
1581 ]
1582
Brian Curtin918616c2010-10-07 02:12:17 +00001583@unittest.skipUnless(HAS_SHAREDCTYPES,
1584 "requires multiprocessing.sharedctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001585class _TestSharedCTypes(BaseTestCase):
1586
1587 ALLOWED_TYPES = ('processes',)
1588
Antoine Pitrou26899f42010-11-02 23:52:49 +00001589 @classmethod
1590 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001591 x.value *= 2
1592 y.value *= 2
1593 foo.x *= 2
1594 foo.y *= 2
1595 string.value *= 2
1596 for i in range(len(arr)):
1597 arr[i] *= 2
1598
1599 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001600 x = Value('i', 7, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001601 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001602 foo = Value(_Foo, 3, 2, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001603 arr = self.Array('d', list(range(10)), lock=lock)
1604 string = self.Array('c', 20, lock=lock)
1605 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001606
1607 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1608 p.start()
1609 p.join()
1610
1611 self.assertEqual(x.value, 14)
1612 self.assertAlmostEqual(y.value, 2.0/3.0)
1613 self.assertEqual(foo.x, 6)
1614 self.assertAlmostEqual(foo.y, 4.0)
1615 for i in range(10):
1616 self.assertAlmostEqual(arr[i], i*2)
1617 self.assertEqual(string.value, latin('hellohello'))
1618
1619 def test_synchronize(self):
1620 self.test_sharedctypes(lock=True)
1621
1622 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001623 foo = _Foo(2, 5.0)
Brian Curtin918616c2010-10-07 02:12:17 +00001624 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001625 foo.x = 0
1626 foo.y = 0
1627 self.assertEqual(bar.x, 2)
1628 self.assertAlmostEqual(bar.y, 5.0)
1629
1630#
1631#
1632#
1633
1634class _TestFinalize(BaseTestCase):
1635
1636 ALLOWED_TYPES = ('processes',)
1637
Antoine Pitrou26899f42010-11-02 23:52:49 +00001638 @classmethod
1639 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001640 class Foo(object):
1641 pass
1642
1643 a = Foo()
1644 util.Finalize(a, conn.send, args=('a',))
1645 del a # triggers callback for a
1646
1647 b = Foo()
1648 close_b = util.Finalize(b, conn.send, args=('b',))
1649 close_b() # triggers callback for b
1650 close_b() # does nothing because callback has already been called
1651 del b # does nothing because callback has already been called
1652
1653 c = Foo()
1654 util.Finalize(c, conn.send, args=('c',))
1655
1656 d10 = Foo()
1657 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1658
1659 d01 = Foo()
1660 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1661 d02 = Foo()
1662 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1663 d03 = Foo()
1664 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1665
1666 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1667
1668 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1669
1670 # call mutliprocessing's cleanup function then exit process without
1671 # garbage collecting locals
1672 util._exit_function()
1673 conn.close()
1674 os._exit(0)
1675
1676 def test_finalize(self):
1677 conn, child_conn = self.Pipe()
1678
1679 p = self.Process(target=self._test_finalize, args=(child_conn,))
1680 p.start()
1681 p.join()
1682
1683 result = [obj for obj in iter(conn.recv, 'STOP')]
1684 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1685
1686#
1687# Test that from ... import * works for each module
1688#
1689
1690class _TestImportStar(BaseTestCase):
1691
1692 ALLOWED_TYPES = ('processes',)
1693
1694 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001695 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001696 'multiprocessing', 'multiprocessing.connection',
1697 'multiprocessing.heap', 'multiprocessing.managers',
1698 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001699 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001700 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001701 ]
1702
1703 if c_int is not None:
1704 # This module requires _ctypes
1705 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001706
1707 for name in modules:
1708 __import__(name)
1709 mod = sys.modules[name]
1710
1711 for attr in getattr(mod, '__all__', ()):
1712 self.assertTrue(
1713 hasattr(mod, attr),
1714 '%r does not have attribute %r' % (mod, attr)
1715 )
1716
1717#
1718# Quick test that logging works -- does not test logging output
1719#
1720
1721class _TestLogging(BaseTestCase):
1722
1723 ALLOWED_TYPES = ('processes',)
1724
1725 def test_enable_logging(self):
1726 logger = multiprocessing.get_logger()
1727 logger.setLevel(util.SUBWARNING)
1728 self.assertTrue(logger is not None)
1729 logger.debug('this will not be printed')
1730 logger.info('nor will this')
1731 logger.setLevel(LOG_LEVEL)
1732
Antoine Pitrou26899f42010-11-02 23:52:49 +00001733 @classmethod
1734 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001735 logger = multiprocessing.get_logger()
1736 conn.send(logger.getEffectiveLevel())
1737
1738 def test_level(self):
1739 LEVEL1 = 32
1740 LEVEL2 = 37
1741
1742 logger = multiprocessing.get_logger()
1743 root_logger = logging.getLogger()
1744 root_level = root_logger.level
1745
1746 reader, writer = multiprocessing.Pipe(duplex=False)
1747
1748 logger.setLevel(LEVEL1)
1749 self.Process(target=self._test_level, args=(writer,)).start()
1750 self.assertEqual(LEVEL1, reader.recv())
1751
1752 logger.setLevel(logging.NOTSET)
1753 root_logger.setLevel(LEVEL2)
1754 self.Process(target=self._test_level, args=(writer,)).start()
1755 self.assertEqual(LEVEL2, reader.recv())
1756
1757 root_logger.setLevel(root_level)
1758 logger.setLevel(level=LOG_LEVEL)
1759
1760#
Jesse Noller6214edd2009-01-19 16:23:53 +00001761# Test to verify handle verification, see issue 3321
1762#
1763
1764class TestInvalidHandle(unittest.TestCase):
1765
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001766 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001767 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001768 conn = _multiprocessing.Connection(44977608)
1769 self.assertRaises(IOError, conn.poll)
1770 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001771
Jesse Noller6214edd2009-01-19 16:23:53 +00001772#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001773# Functions used to create test cases from the base ones in this module
1774#
1775
1776def get_attributes(Source, names):
1777 d = {}
1778 for name in names:
1779 obj = getattr(Source, name)
1780 if type(obj) == type(get_attributes):
1781 obj = staticmethod(obj)
1782 d[name] = obj
1783 return d
1784
1785def create_test_cases(Mixin, type):
1786 result = {}
1787 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001788 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001789
1790 for name in list(glob.keys()):
1791 if name.startswith('_Test'):
1792 base = glob[name]
1793 if type in base.ALLOWED_TYPES:
1794 newname = 'With' + Type + name[1:]
1795 class Temp(base, unittest.TestCase, Mixin):
1796 pass
1797 result[newname] = Temp
1798 Temp.__name__ = newname
1799 Temp.__module__ = Mixin.__module__
1800 return result
1801
1802#
1803# Create test cases
1804#
1805
1806class ProcessesMixin(object):
1807 TYPE = 'processes'
1808 Process = multiprocessing.Process
1809 locals().update(get_attributes(multiprocessing, (
1810 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1811 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1812 'RawArray', 'current_process', 'active_children', 'Pipe',
1813 'connection', 'JoinableQueue'
1814 )))
1815
1816testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1817globals().update(testcases_processes)
1818
1819
1820class ManagerMixin(object):
1821 TYPE = 'manager'
1822 Process = multiprocessing.Process
1823 manager = object.__new__(multiprocessing.managers.SyncManager)
1824 locals().update(get_attributes(manager, (
1825 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1826 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1827 'Namespace', 'JoinableQueue'
1828 )))
1829
1830testcases_manager = create_test_cases(ManagerMixin, type='manager')
1831globals().update(testcases_manager)
1832
1833
1834class ThreadsMixin(object):
1835 TYPE = 'threads'
1836 Process = multiprocessing.dummy.Process
1837 locals().update(get_attributes(multiprocessing.dummy, (
1838 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1839 'Condition', 'Event', 'Value', 'Array', 'current_process',
1840 'active_children', 'Pipe', 'connection', 'dict', 'list',
1841 'Namespace', 'JoinableQueue'
1842 )))
1843
1844testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1845globals().update(testcases_threads)
1846
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001847class OtherTest(unittest.TestCase):
1848 # TODO: add more tests for deliver/answer challenge.
1849 def test_deliver_challenge_auth_failure(self):
1850 class _FakeConnection(object):
1851 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001852 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001853 def send_bytes(self, data):
1854 pass
1855 self.assertRaises(multiprocessing.AuthenticationError,
1856 multiprocessing.connection.deliver_challenge,
1857 _FakeConnection(), b'abc')
1858
1859 def test_answer_challenge_auth_failure(self):
1860 class _FakeConnection(object):
1861 def __init__(self):
1862 self.count = 0
1863 def recv_bytes(self, size):
1864 self.count += 1
1865 if self.count == 1:
1866 return multiprocessing.connection.CHALLENGE
1867 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001868 return b'something bogus'
1869 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001870 def send_bytes(self, data):
1871 pass
1872 self.assertRaises(multiprocessing.AuthenticationError,
1873 multiprocessing.connection.answer_challenge,
1874 _FakeConnection(), b'abc')
1875
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001876#
1877# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1878#
1879
1880def initializer(ns):
1881 ns.test += 1
1882
1883class TestInitializers(unittest.TestCase):
1884 def setUp(self):
1885 self.mgr = multiprocessing.Manager()
1886 self.ns = self.mgr.Namespace()
1887 self.ns.test = 0
1888
1889 def tearDown(self):
1890 self.mgr.shutdown()
1891
1892 def test_manager_initializer(self):
1893 m = multiprocessing.managers.SyncManager()
1894 self.assertRaises(TypeError, m.start, 1)
1895 m.start(initializer, (self.ns,))
1896 self.assertEqual(self.ns.test, 1)
1897 m.shutdown()
1898
1899 def test_pool_initializer(self):
1900 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1901 p = multiprocessing.Pool(1, initializer, (self.ns,))
1902 p.close()
1903 p.join()
1904 self.assertEqual(self.ns.test, 1)
1905
R. David Murraya44c6b32009-07-29 15:40:30 +00001906#
1907# Issue 5155, 5313, 5331: Test process in processes
1908# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1909#
1910
1911def _ThisSubProcess(q):
1912 try:
1913 item = q.get(block=False)
1914 except pyqueue.Empty:
1915 pass
1916
1917def _TestProcess(q):
1918 queue = multiprocessing.Queue()
1919 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1920 subProc.start()
1921 subProc.join()
1922
1923def _afunc(x):
1924 return x*x
1925
1926def pool_in_process():
1927 pool = multiprocessing.Pool(processes=4)
1928 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1929
1930class _file_like(object):
1931 def __init__(self, delegate):
1932 self._delegate = delegate
1933 self._pid = None
1934
1935 @property
1936 def cache(self):
1937 pid = os.getpid()
1938 # There are no race conditions since fork keeps only the running thread
1939 if pid != self._pid:
1940 self._pid = pid
1941 self._cache = []
1942 return self._cache
1943
1944 def write(self, data):
1945 self.cache.append(data)
1946
1947 def flush(self):
1948 self._delegate.write(''.join(self.cache))
1949 self._cache = []
1950
1951class TestStdinBadfiledescriptor(unittest.TestCase):
1952
1953 def test_queue_in_process(self):
1954 queue = multiprocessing.Queue()
1955 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1956 proc.start()
1957 proc.join()
1958
1959 def test_pool_in_process(self):
1960 p = multiprocessing.Process(target=pool_in_process)
1961 p.start()
1962 p.join()
1963
1964 def test_flushing(self):
1965 sio = io.StringIO()
1966 flike = _file_like(sio)
1967 flike.write('foo')
1968 proc = multiprocessing.Process(target=lambda: flike.flush())
1969 flike.flush()
1970 assert sio.getvalue() == 'foo'
1971
1972testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1973 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001974
Benjamin Petersone711caf2008-06-11 16:44:04 +00001975#
1976#
1977#
1978
1979def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001980 if sys.platform.startswith("linux"):
1981 try:
1982 lock = multiprocessing.RLock()
1983 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00001984 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001985
Benjamin Petersone711caf2008-06-11 16:44:04 +00001986 if run is None:
1987 from test.support import run_unittest as run
1988
1989 util.get_temp_dir() # creates temp directory for use by all processes
1990
1991 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1992
Benjamin Peterson41181742008-07-02 20:22:54 +00001993 ProcessesMixin.pool = multiprocessing.Pool(4)
1994 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1995 ManagerMixin.manager.__init__()
1996 ManagerMixin.manager.start()
1997 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001998
1999 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002000 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2001 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002002 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2003 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002004 )
2005
2006 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2007 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2008 run(suite)
2009
Benjamin Peterson41181742008-07-02 20:22:54 +00002010 ThreadsMixin.pool.terminate()
2011 ProcessesMixin.pool.terminate()
2012 ManagerMixin.pool.terminate()
2013 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002014
Benjamin Peterson41181742008-07-02 20:22:54 +00002015 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002016
2017def main():
2018 test_main(unittest.TextTestRunner(verbosity=2).run)
2019
2020if __name__ == '__main__':
2021 main()