blob: 1136ab2920edf8e710fe6dc3f8c8653ec7deca07 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000033
34from multiprocessing import util
35
Brian Curtin918616c2010-10-07 02:12:17 +000036try:
37 from multiprocessing.sharedctypes import Value, copy
38 HAS_SHAREDCTYPES = True
39except ImportError:
40 HAS_SHAREDCTYPES = False
41
Benjamin Petersone711caf2008-06-11 16:44:04 +000042#
43#
44#
45
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000046def latin(s):
47 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
Benjamin Petersone711caf2008-06-11 16:44:04 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
54#LOG_LEVEL = logging.WARNING
55
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller6214edd2009-01-19 16:23:53 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersone711caf2008-06-11 16:44:04 +000071#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000072# Some tests require ctypes
73#
74
75try:
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna9b0e9182010-03-28 11:42:38 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersone711caf2008-06-11 16:44:04 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou26899f42010-11-02 23:52:49 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersone711caf2008-06-11 16:44:04 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000154 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155
156 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 self.assertTrue(isinstance(authkey, bytes))
159 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162
Antoine Pitrou26899f42010-11-02 23:52:49 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 q.put(args)
167 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 q.put(current.name)
Antoine Pitrou26899f42010-11-02 23:52:49 +0000169 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000170 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000182 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000186 self.assertEqual(p.authkey, current.authkey)
187 self.assertEqual(p.is_alive(), False)
188 self.assertEqual(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 self.assertTrue(p not in self.active_children())
190 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192
193 p.start()
194
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000195 self.assertEqual(p.exitcode, None)
196 self.assertEqual(p.is_alive(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 self.assertTrue(p in self.active_children())
198
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000199 self.assertEqual(q.get(), args[1:])
200 self.assertEqual(q.get(), kwargs)
201 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202 if self.TYPE != 'threads':
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000203 self.assertEqual(q.get(), current.authkey)
204 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205
206 p.join()
207
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000208 self.assertEqual(p.exitcode, 0)
209 self.assertEqual(p.is_alive(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210 self.assertTrue(p not in self.active_children())
211
Antoine Pitrou26899f42010-11-02 23:52:49 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000221 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
225 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
235 self.assertTrue(p not in self.active_children())
236
237 p.join()
238
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
254 self.assertTrue(p not in self.active_children())
255
256 p.start()
257 self.assertTrue(p in self.active_children())
258
259 p.join()
260 self.assertTrue(p not in self.active_children())
261
Antoine Pitrou26899f42010-11-02 23:52:49 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou26899f42010-11-02 23:52:49 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou26899f42010-11-02 23:52:49 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000369 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(pyqueue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(pyqueue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(pyqueue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou26899f42010-11-02 23:52:49 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000419 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000420 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000436 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(pyqueue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(pyqueue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou26899f42010-11-02 23:52:49 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(pyqueue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou26899f42010-11-02 23:52:49 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in range(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in range(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Nollerf8d00852009-03-31 03:25:07 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersone711caf2008-06-11 16:44:04 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou26899f42010-11-02 23:52:49 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000666 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000670 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000712 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000717 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in range(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in range(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000735 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000739 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in range(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou26899f42010-11-02 23:52:49 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
Ezio Melotti13925002011-03-16 11:05:33 +0200783 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786
Benjamin Peterson965ce872009-04-05 21:24:58 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000809 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810
811#
812#
813#
814
815class _TestValue(BaseTestCase):
816
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000817 ALLOWED_TYPES = ('processes',)
818
Benjamin Petersone711caf2008-06-11 16:44:04 +0000819 codes_values = [
820 ('i', 4343, 24234),
821 ('d', 3.625, -4.25),
822 ('h', -232, 234),
823 ('c', latin('x'), latin('y'))
824 ]
825
Antoine Pitrou72d5a9d2010-11-22 16:33:23 +0000826 def setUp(self):
827 if not HAS_SHAREDCTYPES:
828 self.skipTest("requires multiprocessing.sharedctypes")
829
Antoine Pitrou26899f42010-11-02 23:52:49 +0000830 @classmethod
831 def _test(cls, values):
832 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000833 sv.value = cv[2]
834
835
836 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 if raw:
838 values = [self.RawValue(code, value)
839 for code, value, _ in self.codes_values]
840 else:
841 values = [self.Value(code, value)
842 for code, value, _ in self.codes_values]
843
844 for sv, cv in zip(values, self.codes_values):
845 self.assertEqual(sv.value, cv[1])
846
847 proc = self.Process(target=self._test, args=(values,))
848 proc.start()
849 proc.join()
850
851 for sv, cv in zip(values, self.codes_values):
852 self.assertEqual(sv.value, cv[2])
853
854 def test_rawvalue(self):
855 self.test_value(raw=True)
856
857 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 val1 = self.Value('i', 5)
859 lock1 = val1.get_lock()
860 obj1 = val1.get_obj()
861
862 val2 = self.Value('i', 5, lock=None)
863 lock2 = val2.get_lock()
864 obj2 = val2.get_obj()
865
866 lock = self.Lock()
867 val3 = self.Value('i', 5, lock=lock)
868 lock3 = val3.get_lock()
869 obj3 = val3.get_obj()
870 self.assertEqual(lock, lock3)
871
Jesse Nollerb0516a62009-01-18 03:11:38 +0000872 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 self.assertFalse(hasattr(arr4, 'get_lock'))
874 self.assertFalse(hasattr(arr4, 'get_obj'))
875
Jesse Nollerb0516a62009-01-18 03:11:38 +0000876 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
877
878 arr5 = self.RawValue('i', 5)
879 self.assertFalse(hasattr(arr5, 'get_lock'))
880 self.assertFalse(hasattr(arr5, 'get_obj'))
881
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882
883class _TestArray(BaseTestCase):
884
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000885 ALLOWED_TYPES = ('processes',)
886
Antoine Pitrou26899f42010-11-02 23:52:49 +0000887 @classmethod
888 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000889 for i in range(1, len(seq)):
890 seq[i] += seq[i-1]
891
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000893 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
895 if raw:
896 arr = self.RawArray('i', seq)
897 else:
898 arr = self.Array('i', seq)
899
900 self.assertEqual(len(arr), len(seq))
901 self.assertEqual(arr[3], seq[3])
902 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
903
904 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
905
906 self.assertEqual(list(arr[:]), seq)
907
908 self.f(seq)
909
910 p = self.Process(target=self.f, args=(arr,))
911 p.start()
912 p.join()
913
914 self.assertEqual(list(arr[:]), seq)
915
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000917 def test_array_from_size(self):
918 size = 10
919 # Test for zeroing (see issue #11675).
920 # The repetition below strengthens the test by increasing the chances
921 # of previously allocated non-zero memory being used for the new array
922 # on the 2nd and 3rd loops.
923 for _ in range(3):
924 arr = self.Array('i', size)
925 self.assertEqual(len(arr), size)
926 self.assertEqual(list(arr), [0] * size)
927 arr[:] = range(10)
928 self.assertEqual(list(arr), list(range(10)))
929 del arr
930
931 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000932 def test_rawarray(self):
933 self.test_array(raw=True)
934
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000935 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000936 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 arr1 = self.Array('i', list(range(10)))
938 lock1 = arr1.get_lock()
939 obj1 = arr1.get_obj()
940
941 arr2 = self.Array('i', list(range(10)), lock=None)
942 lock2 = arr2.get_lock()
943 obj2 = arr2.get_obj()
944
945 lock = self.Lock()
946 arr3 = self.Array('i', list(range(10)), lock=lock)
947 lock3 = arr3.get_lock()
948 obj3 = arr3.get_obj()
949 self.assertEqual(lock, lock3)
950
Jesse Nollerb0516a62009-01-18 03:11:38 +0000951 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 self.assertFalse(hasattr(arr4, 'get_lock'))
953 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000954 self.assertRaises(AttributeError,
955 self.Array, 'i', range(10), lock='notalock')
956
957 arr5 = self.RawArray('i', range(10))
958 self.assertFalse(hasattr(arr5, 'get_lock'))
959 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960
961#
962#
963#
964
965class _TestContainers(BaseTestCase):
966
967 ALLOWED_TYPES = ('manager',)
968
969 def test_list(self):
970 a = self.list(list(range(10)))
971 self.assertEqual(a[:], list(range(10)))
972
973 b = self.list()
974 self.assertEqual(b[:], [])
975
976 b.extend(list(range(5)))
977 self.assertEqual(b[:], list(range(5)))
978
979 self.assertEqual(b[2], 2)
980 self.assertEqual(b[2:10], [2,3,4])
981
982 b *= 2
983 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
984
985 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
986
987 self.assertEqual(a[:], list(range(10)))
988
989 d = [a, b]
990 e = self.list(d)
991 self.assertEqual(
992 e[:],
993 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
994 )
995
996 f = self.list([a])
997 a.append('hello')
998 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
999
1000 def test_dict(self):
1001 d = self.dict()
1002 indices = list(range(65, 70))
1003 for i in indices:
1004 d[i] = chr(i)
1005 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1006 self.assertEqual(sorted(d.keys()), indices)
1007 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1008 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1009
1010 def test_namespace(self):
1011 n = self.Namespace()
1012 n.name = 'Bob'
1013 n.job = 'Builder'
1014 n._hidden = 'hidden'
1015 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1016 del n.job
1017 self.assertEqual(str(n), "Namespace(name='Bob')")
1018 self.assertTrue(hasattr(n, 'name'))
1019 self.assertTrue(not hasattr(n, 'job'))
1020
1021#
1022#
1023#
1024
1025def sqr(x, wait=0.0):
1026 time.sleep(wait)
1027 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +00001028class _TestPool(BaseTestCase):
1029
1030 def test_apply(self):
1031 papply = self.pool.apply
1032 self.assertEqual(papply(sqr, (5,)), sqr(5))
1033 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1034
1035 def test_map(self):
1036 pmap = self.pool.map
1037 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1038 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1039 list(map(sqr, list(range(100)))))
1040
Georg Brandld80344f2009-08-13 12:26:19 +00001041 def test_map_chunksize(self):
1042 try:
1043 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1044 except multiprocessing.TimeoutError:
1045 self.fail("pool.map_async with chunksize stalled on null list")
1046
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047 def test_async(self):
1048 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1049 get = TimingWrapper(res.get)
1050 self.assertEqual(get(), 49)
1051 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1052
1053 def test_async_timeout(self):
1054 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1055 get = TimingWrapper(res.get)
1056 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1057 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1058
1059 def test_imap(self):
1060 it = self.pool.imap(sqr, list(range(10)))
1061 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1062
1063 it = self.pool.imap(sqr, list(range(10)))
1064 for i in range(10):
1065 self.assertEqual(next(it), i*i)
1066 self.assertRaises(StopIteration, it.__next__)
1067
1068 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1069 for i in range(1000):
1070 self.assertEqual(next(it), i*i)
1071 self.assertRaises(StopIteration, it.__next__)
1072
1073 def test_imap_unordered(self):
1074 it = self.pool.imap_unordered(sqr, list(range(1000)))
1075 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1076
1077 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1078 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1079
1080 def test_make_pool(self):
1081 p = multiprocessing.Pool(3)
1082 self.assertEqual(3, len(p._pool))
1083 p.close()
1084 p.join()
1085
1086 def test_terminate(self):
1087 if self.TYPE == 'manager':
1088 # On Unix a forked process increfs each shared object to
1089 # which its parent process held a reference. If the
1090 # forked process gets terminated then there is likely to
1091 # be a reference leak. So to prevent
1092 # _TestZZZNumberOfObjects from failing we skip this test
1093 # when using a manager.
1094 return
1095
1096 result = self.pool.map_async(
1097 time.sleep, [0.1 for i in range(10000)], chunksize=1
1098 )
1099 self.pool.terminate()
1100 join = TimingWrapper(self.pool.join)
1101 join()
Victor Stinner29943aa2011-03-24 16:24:07 +01001102 self.assertLess(join.elapsed, 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001103#
1104# Test that manager has expected number of shared objects left
1105#
1106
1107class _TestZZZNumberOfObjects(BaseTestCase):
1108 # Because test cases are sorted alphabetically, this one will get
1109 # run after all the other tests for the manager. It tests that
1110 # there have been no "reference leaks" for the manager's shared
1111 # objects. Note the comment in _TestPool.test_terminate().
1112 ALLOWED_TYPES = ('manager',)
1113
1114 def test_number_of_objects(self):
1115 EXPECTED_NUMBER = 1 # the pool object is still alive
1116 multiprocessing.active_children() # discard dead process objs
1117 gc.collect() # do garbage collection
1118 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001119 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001120 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001121 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001122 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001123
1124 self.assertEqual(refs, EXPECTED_NUMBER)
1125
1126#
1127# Test of creating a customized manager class
1128#
1129
1130from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1131
1132class FooBar(object):
1133 def f(self):
1134 return 'f()'
1135 def g(self):
1136 raise ValueError
1137 def _h(self):
1138 return '_h()'
1139
1140def baz():
1141 for i in range(10):
1142 yield i*i
1143
1144class IteratorProxy(BaseProxy):
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001145 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001146 def __iter__(self):
1147 return self
1148 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001149 return self._callmethod('__next__')
1150
1151class MyManager(BaseManager):
1152 pass
1153
1154MyManager.register('Foo', callable=FooBar)
1155MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1156MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1157
1158
1159class _TestMyManager(BaseTestCase):
1160
1161 ALLOWED_TYPES = ('manager',)
1162
1163 def test_mymanager(self):
1164 manager = MyManager()
1165 manager.start()
1166
1167 foo = manager.Foo()
1168 bar = manager.Bar()
1169 baz = manager.baz()
1170
1171 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1172 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1173
1174 self.assertEqual(foo_methods, ['f', 'g'])
1175 self.assertEqual(bar_methods, ['f', '_h'])
1176
1177 self.assertEqual(foo.f(), 'f()')
1178 self.assertRaises(ValueError, foo.g)
1179 self.assertEqual(foo._callmethod('f'), 'f()')
1180 self.assertRaises(RemoteError, foo._callmethod, '_h')
1181
1182 self.assertEqual(bar.f(), 'f()')
1183 self.assertEqual(bar._h(), '_h()')
1184 self.assertEqual(bar._callmethod('f'), 'f()')
1185 self.assertEqual(bar._callmethod('_h'), '_h()')
1186
1187 self.assertEqual(list(baz), [i*i for i in range(10)])
1188
1189 manager.shutdown()
1190
1191#
1192# Test of connecting to a remote server and using xmlrpclib for serialization
1193#
1194
1195_queue = pyqueue.Queue()
1196def get_queue():
1197 return _queue
1198
1199class QueueManager(BaseManager):
1200 '''manager class used by server process'''
1201QueueManager.register('get_queue', callable=get_queue)
1202
1203class QueueManager2(BaseManager):
1204 '''manager class which specifies the same interface as QueueManager'''
1205QueueManager2.register('get_queue')
1206
1207
1208SERIALIZER = 'xmlrpclib'
1209
1210class _TestRemoteManager(BaseTestCase):
1211
1212 ALLOWED_TYPES = ('manager',)
1213
Antoine Pitrou26899f42010-11-02 23:52:49 +00001214 @classmethod
1215 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001216 manager = QueueManager2(
1217 address=address, authkey=authkey, serializer=SERIALIZER
1218 )
1219 manager.connect()
1220 queue = manager.get_queue()
1221 queue.put(('hello world', None, True, 2.25))
1222
1223 def test_remote(self):
1224 authkey = os.urandom(32)
1225
1226 manager = QueueManager(
1227 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1228 )
1229 manager.start()
1230
1231 p = self.Process(target=self._putter, args=(manager.address, authkey))
1232 p.start()
1233
1234 manager2 = QueueManager2(
1235 address=manager.address, authkey=authkey, serializer=SERIALIZER
1236 )
1237 manager2.connect()
1238 queue = manager2.get_queue()
1239
1240 # Note that xmlrpclib will deserialize object as a list not a tuple
1241 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1242
1243 # Because we are using xmlrpclib for serialization instead of
1244 # pickle this will cause a serialization error.
1245 self.assertRaises(Exception, queue.put, time.sleep)
1246
1247 # Make queue finalizer run before the server is stopped
1248 del queue
1249 manager.shutdown()
1250
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001251class _TestManagerRestart(BaseTestCase):
1252
Antoine Pitrou26899f42010-11-02 23:52:49 +00001253 @classmethod
1254 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001255 manager = QueueManager(
1256 address=address, authkey=authkey, serializer=SERIALIZER)
1257 manager.connect()
1258 queue = manager.get_queue()
1259 queue.put('hello world')
1260
1261 def test_rapid_restart(self):
1262 authkey = os.urandom(32)
1263 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001264 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin9e2fadc2010-11-01 05:12:34 +00001265 srvr = manager.get_server()
1266 addr = srvr.address
1267 # Close the connection.Listener socket which gets opened as a part
1268 # of manager.get_server(). It's not needed for the test.
1269 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001270 manager.start()
1271
1272 p = self.Process(target=self._putter, args=(manager.address, authkey))
1273 p.start()
1274 queue = manager.get_queue()
1275 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001276 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001277 manager.shutdown()
1278 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001279 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001280 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001281 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001282
Benjamin Petersone711caf2008-06-11 16:44:04 +00001283#
1284#
1285#
1286
1287SENTINEL = latin('')
1288
1289class _TestConnection(BaseTestCase):
1290
1291 ALLOWED_TYPES = ('processes', 'threads')
1292
Antoine Pitrou26899f42010-11-02 23:52:49 +00001293 @classmethod
1294 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001295 for msg in iter(conn.recv_bytes, SENTINEL):
1296 conn.send_bytes(msg)
1297 conn.close()
1298
1299 def test_connection(self):
1300 conn, child_conn = self.Pipe()
1301
1302 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001303 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001304 p.start()
1305
1306 seq = [1, 2.25, None]
1307 msg = latin('hello world')
1308 longmsg = msg * 10
1309 arr = array.array('i', list(range(4)))
1310
1311 if self.TYPE == 'processes':
1312 self.assertEqual(type(conn.fileno()), int)
1313
1314 self.assertEqual(conn.send(seq), None)
1315 self.assertEqual(conn.recv(), seq)
1316
1317 self.assertEqual(conn.send_bytes(msg), None)
1318 self.assertEqual(conn.recv_bytes(), msg)
1319
1320 if self.TYPE == 'processes':
1321 buffer = array.array('i', [0]*10)
1322 expected = list(arr) + [0] * (10 - len(arr))
1323 self.assertEqual(conn.send_bytes(arr), None)
1324 self.assertEqual(conn.recv_bytes_into(buffer),
1325 len(arr) * buffer.itemsize)
1326 self.assertEqual(list(buffer), expected)
1327
1328 buffer = array.array('i', [0]*10)
1329 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1330 self.assertEqual(conn.send_bytes(arr), None)
1331 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1332 len(arr) * buffer.itemsize)
1333 self.assertEqual(list(buffer), expected)
1334
1335 buffer = bytearray(latin(' ' * 40))
1336 self.assertEqual(conn.send_bytes(longmsg), None)
1337 try:
1338 res = conn.recv_bytes_into(buffer)
1339 except multiprocessing.BufferTooShort as e:
1340 self.assertEqual(e.args, (longmsg,))
1341 else:
1342 self.fail('expected BufferTooShort, got %s' % res)
1343
1344 poll = TimingWrapper(conn.poll)
1345
1346 self.assertEqual(poll(), False)
1347 self.assertTimingAlmostEqual(poll.elapsed, 0)
1348
1349 self.assertEqual(poll(TIMEOUT1), False)
1350 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1351
1352 conn.send(None)
1353
1354 self.assertEqual(poll(TIMEOUT1), True)
1355 self.assertTimingAlmostEqual(poll.elapsed, 0)
1356
1357 self.assertEqual(conn.recv(), None)
1358
1359 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1360 conn.send_bytes(really_big_msg)
1361 self.assertEqual(conn.recv_bytes(), really_big_msg)
1362
1363 conn.send_bytes(SENTINEL) # tell child to quit
1364 child_conn.close()
1365
1366 if self.TYPE == 'processes':
1367 self.assertEqual(conn.readable, True)
1368 self.assertEqual(conn.writable, True)
1369 self.assertRaises(EOFError, conn.recv)
1370 self.assertRaises(EOFError, conn.recv_bytes)
1371
1372 p.join()
1373
1374 def test_duplex_false(self):
1375 reader, writer = self.Pipe(duplex=False)
1376 self.assertEqual(writer.send(1), None)
1377 self.assertEqual(reader.recv(), 1)
1378 if self.TYPE == 'processes':
1379 self.assertEqual(reader.readable, True)
1380 self.assertEqual(reader.writable, False)
1381 self.assertEqual(writer.readable, False)
1382 self.assertEqual(writer.writable, True)
1383 self.assertRaises(IOError, reader.send, 2)
1384 self.assertRaises(IOError, writer.recv)
1385 self.assertRaises(IOError, writer.poll)
1386
1387 def test_spawn_close(self):
1388 # We test that a pipe connection can be closed by parent
1389 # process immediately after child is spawned. On Windows this
1390 # would have sometimes failed on old versions because
1391 # child_conn would be closed before the child got a chance to
1392 # duplicate it.
1393 conn, child_conn = self.Pipe()
1394
1395 p = self.Process(target=self._echo, args=(child_conn,))
1396 p.start()
1397 child_conn.close() # this might complete before child initializes
1398
1399 msg = latin('hello')
1400 conn.send_bytes(msg)
1401 self.assertEqual(conn.recv_bytes(), msg)
1402
1403 conn.send_bytes(SENTINEL)
1404 conn.close()
1405 p.join()
1406
1407 def test_sendbytes(self):
1408 if self.TYPE != 'processes':
1409 return
1410
1411 msg = latin('abcdefghijklmnopqrstuvwxyz')
1412 a, b = self.Pipe()
1413
1414 a.send_bytes(msg)
1415 self.assertEqual(b.recv_bytes(), msg)
1416
1417 a.send_bytes(msg, 5)
1418 self.assertEqual(b.recv_bytes(), msg[5:])
1419
1420 a.send_bytes(msg, 7, 8)
1421 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1422
1423 a.send_bytes(msg, 26)
1424 self.assertEqual(b.recv_bytes(), latin(''))
1425
1426 a.send_bytes(msg, 26, 0)
1427 self.assertEqual(b.recv_bytes(), latin(''))
1428
1429 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1430
1431 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1432
1433 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1434
1435 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1436
1437 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1438
Benjamin Petersone711caf2008-06-11 16:44:04 +00001439class _TestListenerClient(BaseTestCase):
1440
1441 ALLOWED_TYPES = ('processes', 'threads')
1442
Antoine Pitrou26899f42010-11-02 23:52:49 +00001443 @classmethod
1444 def _test(cls, address):
1445 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001446 conn.send('hello')
1447 conn.close()
1448
1449 def test_listener_client(self):
1450 for family in self.connection.families:
1451 l = self.connection.Listener(family=family)
1452 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001453 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001454 p.start()
1455 conn = l.accept()
1456 self.assertEqual(conn.recv(), 'hello')
1457 p.join()
1458 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001459#
1460# Test of sending connection and socket objects between processes
1461#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001462"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001463class _TestPicklingConnections(BaseTestCase):
1464
1465 ALLOWED_TYPES = ('processes',)
1466
1467 def _listener(self, conn, families):
1468 for fam in families:
1469 l = self.connection.Listener(family=fam)
1470 conn.send(l.address)
1471 new_conn = l.accept()
1472 conn.send(new_conn)
1473
1474 if self.TYPE == 'processes':
1475 l = socket.socket()
1476 l.bind(('localhost', 0))
1477 conn.send(l.getsockname())
1478 l.listen(1)
1479 new_conn, addr = l.accept()
1480 conn.send(new_conn)
1481
1482 conn.recv()
1483
1484 def _remote(self, conn):
1485 for (address, msg) in iter(conn.recv, None):
1486 client = self.connection.Client(address)
1487 client.send(msg.upper())
1488 client.close()
1489
1490 if self.TYPE == 'processes':
1491 address, msg = conn.recv()
1492 client = socket.socket()
1493 client.connect(address)
1494 client.sendall(msg.upper())
1495 client.close()
1496
1497 conn.close()
1498
1499 def test_pickling(self):
1500 try:
1501 multiprocessing.allow_connection_pickling()
1502 except ImportError:
1503 return
1504
1505 families = self.connection.families
1506
1507 lconn, lconn0 = self.Pipe()
1508 lp = self.Process(target=self._listener, args=(lconn0, families))
1509 lp.start()
1510 lconn0.close()
1511
1512 rconn, rconn0 = self.Pipe()
1513 rp = self.Process(target=self._remote, args=(rconn0,))
1514 rp.start()
1515 rconn0.close()
1516
1517 for fam in families:
1518 msg = ('This connection uses family %s' % fam).encode('ascii')
1519 address = lconn.recv()
1520 rconn.send((address, msg))
1521 new_conn = lconn.recv()
1522 self.assertEqual(new_conn.recv(), msg.upper())
1523
1524 rconn.send(None)
1525
1526 if self.TYPE == 'processes':
1527 msg = latin('This connection uses a normal socket')
1528 address = lconn.recv()
1529 rconn.send((address, msg))
1530 if hasattr(socket, 'fromfd'):
1531 new_conn = lconn.recv()
1532 self.assertEqual(new_conn.recv(100), msg.upper())
1533 else:
1534 # XXX On Windows with Py2.6 need to backport fromfd()
1535 discard = lconn.recv_bytes()
1536
1537 lconn.send(None)
1538
1539 rconn.close()
1540 lconn.close()
1541
1542 lp.join()
1543 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001544"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001545#
1546#
1547#
1548
1549class _TestHeap(BaseTestCase):
1550
1551 ALLOWED_TYPES = ('processes',)
1552
1553 def test_heap(self):
1554 iterations = 5000
1555 maxblocks = 50
1556 blocks = []
1557
1558 # create and destroy lots of blocks of different sizes
1559 for i in range(iterations):
1560 size = int(random.lognormvariate(0, 1) * 1000)
1561 b = multiprocessing.heap.BufferWrapper(size)
1562 blocks.append(b)
1563 if len(blocks) > maxblocks:
1564 i = random.randrange(maxblocks)
1565 del blocks[i]
1566
1567 # get the heap object
1568 heap = multiprocessing.heap.BufferWrapper._heap
1569
1570 # verify the state of the heap
1571 all = []
1572 occupied = 0
1573 for L in list(heap._len_to_seq.values()):
1574 for arena, start, stop in L:
1575 all.append((heap._arenas.index(arena), start, stop,
1576 stop-start, 'free'))
1577 for arena, start, stop in heap._allocated_blocks:
1578 all.append((heap._arenas.index(arena), start, stop,
1579 stop-start, 'occupied'))
1580 occupied += (stop-start)
1581
1582 all.sort()
1583
1584 for i in range(len(all)-1):
1585 (arena, start, stop) = all[i][:3]
1586 (narena, nstart, nstop) = all[i+1][:3]
1587 self.assertTrue((arena != narena and nstart == 0) or
1588 (stop == nstart))
1589
1590#
1591#
1592#
1593
Benjamin Petersone711caf2008-06-11 16:44:04 +00001594class _Foo(Structure):
1595 _fields_ = [
1596 ('x', c_int),
1597 ('y', c_double)
1598 ]
1599
1600class _TestSharedCTypes(BaseTestCase):
1601
1602 ALLOWED_TYPES = ('processes',)
1603
Antoine Pitrou72d5a9d2010-11-22 16:33:23 +00001604 def setUp(self):
1605 if not HAS_SHAREDCTYPES:
1606 self.skipTest("requires multiprocessing.sharedctypes")
1607
Antoine Pitrou26899f42010-11-02 23:52:49 +00001608 @classmethod
1609 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001610 x.value *= 2
1611 y.value *= 2
1612 foo.x *= 2
1613 foo.y *= 2
1614 string.value *= 2
1615 for i in range(len(arr)):
1616 arr[i] *= 2
1617
1618 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001619 x = Value('i', 7, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001620 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001621 foo = Value(_Foo, 3, 2, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001622 arr = self.Array('d', list(range(10)), lock=lock)
1623 string = self.Array('c', 20, lock=lock)
1624 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001625
1626 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1627 p.start()
1628 p.join()
1629
1630 self.assertEqual(x.value, 14)
1631 self.assertAlmostEqual(y.value, 2.0/3.0)
1632 self.assertEqual(foo.x, 6)
1633 self.assertAlmostEqual(foo.y, 4.0)
1634 for i in range(10):
1635 self.assertAlmostEqual(arr[i], i*2)
1636 self.assertEqual(string.value, latin('hellohello'))
1637
1638 def test_synchronize(self):
1639 self.test_sharedctypes(lock=True)
1640
1641 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001642 foo = _Foo(2, 5.0)
Brian Curtin918616c2010-10-07 02:12:17 +00001643 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001644 foo.x = 0
1645 foo.y = 0
1646 self.assertEqual(bar.x, 2)
1647 self.assertAlmostEqual(bar.y, 5.0)
1648
1649#
1650#
1651#
1652
1653class _TestFinalize(BaseTestCase):
1654
1655 ALLOWED_TYPES = ('processes',)
1656
Antoine Pitrou26899f42010-11-02 23:52:49 +00001657 @classmethod
1658 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001659 class Foo(object):
1660 pass
1661
1662 a = Foo()
1663 util.Finalize(a, conn.send, args=('a',))
1664 del a # triggers callback for a
1665
1666 b = Foo()
1667 close_b = util.Finalize(b, conn.send, args=('b',))
1668 close_b() # triggers callback for b
1669 close_b() # does nothing because callback has already been called
1670 del b # does nothing because callback has already been called
1671
1672 c = Foo()
1673 util.Finalize(c, conn.send, args=('c',))
1674
1675 d10 = Foo()
1676 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1677
1678 d01 = Foo()
1679 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1680 d02 = Foo()
1681 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1682 d03 = Foo()
1683 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1684
1685 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1686
1687 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1688
Ezio Melotti13925002011-03-16 11:05:33 +02001689 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001690 # garbage collecting locals
1691 util._exit_function()
1692 conn.close()
1693 os._exit(0)
1694
1695 def test_finalize(self):
1696 conn, child_conn = self.Pipe()
1697
1698 p = self.Process(target=self._test_finalize, args=(child_conn,))
1699 p.start()
1700 p.join()
1701
1702 result = [obj for obj in iter(conn.recv, 'STOP')]
1703 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1704
1705#
1706# Test that from ... import * works for each module
1707#
1708
1709class _TestImportStar(BaseTestCase):
1710
1711 ALLOWED_TYPES = ('processes',)
1712
1713 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001714 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001715 'multiprocessing', 'multiprocessing.connection',
1716 'multiprocessing.heap', 'multiprocessing.managers',
1717 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001718 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001719 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001720 ]
1721
1722 if c_int is not None:
1723 # This module requires _ctypes
1724 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001725
1726 for name in modules:
1727 __import__(name)
1728 mod = sys.modules[name]
1729
1730 for attr in getattr(mod, '__all__', ()):
1731 self.assertTrue(
1732 hasattr(mod, attr),
1733 '%r does not have attribute %r' % (mod, attr)
1734 )
1735
1736#
1737# Quick test that logging works -- does not test logging output
1738#
1739
1740class _TestLogging(BaseTestCase):
1741
1742 ALLOWED_TYPES = ('processes',)
1743
1744 def test_enable_logging(self):
1745 logger = multiprocessing.get_logger()
1746 logger.setLevel(util.SUBWARNING)
1747 self.assertTrue(logger is not None)
1748 logger.debug('this will not be printed')
1749 logger.info('nor will this')
1750 logger.setLevel(LOG_LEVEL)
1751
Antoine Pitrou26899f42010-11-02 23:52:49 +00001752 @classmethod
1753 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001754 logger = multiprocessing.get_logger()
1755 conn.send(logger.getEffectiveLevel())
1756
1757 def test_level(self):
1758 LEVEL1 = 32
1759 LEVEL2 = 37
1760
1761 logger = multiprocessing.get_logger()
1762 root_logger = logging.getLogger()
1763 root_level = root_logger.level
1764
1765 reader, writer = multiprocessing.Pipe(duplex=False)
1766
1767 logger.setLevel(LEVEL1)
1768 self.Process(target=self._test_level, args=(writer,)).start()
1769 self.assertEqual(LEVEL1, reader.recv())
1770
1771 logger.setLevel(logging.NOTSET)
1772 root_logger.setLevel(LEVEL2)
1773 self.Process(target=self._test_level, args=(writer,)).start()
1774 self.assertEqual(LEVEL2, reader.recv())
1775
1776 root_logger.setLevel(root_level)
1777 logger.setLevel(level=LOG_LEVEL)
1778
1779#
Jesse Noller6214edd2009-01-19 16:23:53 +00001780# Test to verify handle verification, see issue 3321
1781#
1782
1783class TestInvalidHandle(unittest.TestCase):
1784
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001785 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001786 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001787 conn = _multiprocessing.Connection(44977608)
1788 self.assertRaises(IOError, conn.poll)
1789 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001790
Jesse Noller6214edd2009-01-19 16:23:53 +00001791#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001792# Functions used to create test cases from the base ones in this module
1793#
1794
1795def get_attributes(Source, names):
1796 d = {}
1797 for name in names:
1798 obj = getattr(Source, name)
1799 if type(obj) == type(get_attributes):
1800 obj = staticmethod(obj)
1801 d[name] = obj
1802 return d
1803
1804def create_test_cases(Mixin, type):
1805 result = {}
1806 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001807 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001808
1809 for name in list(glob.keys()):
1810 if name.startswith('_Test'):
1811 base = glob[name]
1812 if type in base.ALLOWED_TYPES:
1813 newname = 'With' + Type + name[1:]
1814 class Temp(base, unittest.TestCase, Mixin):
1815 pass
1816 result[newname] = Temp
1817 Temp.__name__ = newname
1818 Temp.__module__ = Mixin.__module__
1819 return result
1820
1821#
1822# Create test cases
1823#
1824
1825class ProcessesMixin(object):
1826 TYPE = 'processes'
1827 Process = multiprocessing.Process
1828 locals().update(get_attributes(multiprocessing, (
1829 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1830 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1831 'RawArray', 'current_process', 'active_children', 'Pipe',
1832 'connection', 'JoinableQueue'
1833 )))
1834
1835testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1836globals().update(testcases_processes)
1837
1838
1839class ManagerMixin(object):
1840 TYPE = 'manager'
1841 Process = multiprocessing.Process
1842 manager = object.__new__(multiprocessing.managers.SyncManager)
1843 locals().update(get_attributes(manager, (
1844 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1845 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1846 'Namespace', 'JoinableQueue'
1847 )))
1848
1849testcases_manager = create_test_cases(ManagerMixin, type='manager')
1850globals().update(testcases_manager)
1851
1852
1853class ThreadsMixin(object):
1854 TYPE = 'threads'
1855 Process = multiprocessing.dummy.Process
1856 locals().update(get_attributes(multiprocessing.dummy, (
1857 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1858 'Condition', 'Event', 'Value', 'Array', 'current_process',
1859 'active_children', 'Pipe', 'connection', 'dict', 'list',
1860 'Namespace', 'JoinableQueue'
1861 )))
1862
1863testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1864globals().update(testcases_threads)
1865
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001866class OtherTest(unittest.TestCase):
1867 # TODO: add more tests for deliver/answer challenge.
1868 def test_deliver_challenge_auth_failure(self):
1869 class _FakeConnection(object):
1870 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001871 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001872 def send_bytes(self, data):
1873 pass
1874 self.assertRaises(multiprocessing.AuthenticationError,
1875 multiprocessing.connection.deliver_challenge,
1876 _FakeConnection(), b'abc')
1877
1878 def test_answer_challenge_auth_failure(self):
1879 class _FakeConnection(object):
1880 def __init__(self):
1881 self.count = 0
1882 def recv_bytes(self, size):
1883 self.count += 1
1884 if self.count == 1:
1885 return multiprocessing.connection.CHALLENGE
1886 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001887 return b'something bogus'
1888 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001889 def send_bytes(self, data):
1890 pass
1891 self.assertRaises(multiprocessing.AuthenticationError,
1892 multiprocessing.connection.answer_challenge,
1893 _FakeConnection(), b'abc')
1894
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001895#
1896# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1897#
1898
1899def initializer(ns):
1900 ns.test += 1
1901
1902class TestInitializers(unittest.TestCase):
1903 def setUp(self):
1904 self.mgr = multiprocessing.Manager()
1905 self.ns = self.mgr.Namespace()
1906 self.ns.test = 0
1907
1908 def tearDown(self):
1909 self.mgr.shutdown()
1910
1911 def test_manager_initializer(self):
1912 m = multiprocessing.managers.SyncManager()
1913 self.assertRaises(TypeError, m.start, 1)
1914 m.start(initializer, (self.ns,))
1915 self.assertEqual(self.ns.test, 1)
1916 m.shutdown()
1917
1918 def test_pool_initializer(self):
1919 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1920 p = multiprocessing.Pool(1, initializer, (self.ns,))
1921 p.close()
1922 p.join()
1923 self.assertEqual(self.ns.test, 1)
1924
R. David Murraya44c6b32009-07-29 15:40:30 +00001925#
1926# Issue 5155, 5313, 5331: Test process in processes
1927# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1928#
1929
1930def _ThisSubProcess(q):
1931 try:
1932 item = q.get(block=False)
1933 except pyqueue.Empty:
1934 pass
1935
1936def _TestProcess(q):
1937 queue = multiprocessing.Queue()
1938 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1939 subProc.start()
1940 subProc.join()
1941
1942def _afunc(x):
1943 return x*x
1944
1945def pool_in_process():
1946 pool = multiprocessing.Pool(processes=4)
1947 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1948
1949class _file_like(object):
1950 def __init__(self, delegate):
1951 self._delegate = delegate
1952 self._pid = None
1953
1954 @property
1955 def cache(self):
1956 pid = os.getpid()
1957 # There are no race conditions since fork keeps only the running thread
1958 if pid != self._pid:
1959 self._pid = pid
1960 self._cache = []
1961 return self._cache
1962
1963 def write(self, data):
1964 self.cache.append(data)
1965
1966 def flush(self):
1967 self._delegate.write(''.join(self.cache))
1968 self._cache = []
1969
1970class TestStdinBadfiledescriptor(unittest.TestCase):
1971
1972 def test_queue_in_process(self):
1973 queue = multiprocessing.Queue()
1974 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1975 proc.start()
1976 proc.join()
1977
1978 def test_pool_in_process(self):
1979 p = multiprocessing.Process(target=pool_in_process)
1980 p.start()
1981 p.join()
1982
1983 def test_flushing(self):
1984 sio = io.StringIO()
1985 flike = _file_like(sio)
1986 flike.write('foo')
1987 proc = multiprocessing.Process(target=lambda: flike.flush())
1988 flike.flush()
1989 assert sio.getvalue() == 'foo'
1990
1991testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1992 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001993
Benjamin Petersone711caf2008-06-11 16:44:04 +00001994#
1995#
1996#
1997
1998def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001999 if sys.platform.startswith("linux"):
2000 try:
2001 lock = multiprocessing.RLock()
2002 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002003 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002004
Benjamin Petersone711caf2008-06-11 16:44:04 +00002005 if run is None:
2006 from test.support import run_unittest as run
2007
2008 util.get_temp_dir() # creates temp directory for use by all processes
2009
2010 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2011
Benjamin Peterson41181742008-07-02 20:22:54 +00002012 ProcessesMixin.pool = multiprocessing.Pool(4)
2013 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2014 ManagerMixin.manager.__init__()
2015 ManagerMixin.manager.start()
2016 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002017
2018 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002019 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2020 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002021 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2022 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002023 )
2024
2025 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2026 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2027 run(suite)
2028
Benjamin Peterson41181742008-07-02 20:22:54 +00002029 ThreadsMixin.pool.terminate()
2030 ProcessesMixin.pool.terminate()
2031 ManagerMixin.pool.terminate()
2032 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002033
Benjamin Peterson41181742008-07-02 20:22:54 +00002034 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002035
2036def main():
2037 test_main(unittest.TextTestRunner(verbosity=2).run)
2038
2039if __name__ == '__main__':
2040 main()