blob: f4031de61fdba08e7bf73e9ffde898145a0f5e93 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000027
Benjamin Petersone711caf2008-06-11 16:44:04 +000028import multiprocessing.dummy
29import multiprocessing.connection
30import multiprocessing.managers
31import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000033
34from multiprocessing import util
35
Brian Curtin918616c2010-10-07 02:12:17 +000036try:
37 from multiprocessing.sharedctypes import Value, copy
38 HAS_SHAREDCTYPES = True
39except ImportError:
40 HAS_SHAREDCTYPES = False
41
Benjamin Petersone711caf2008-06-11 16:44:04 +000042#
43#
44#
45
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000046def latin(s):
47 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
Benjamin Petersone711caf2008-06-11 16:44:04 +000049#
50# Constants
51#
52
53LOG_LEVEL = util.SUBWARNING
54#LOG_LEVEL = logging.WARNING
55
56DELTA = 0.1
57CHECK_TIMINGS = False # making true makes tests take a lot longer
58 # and can sometimes cause some non-serious
59 # failures because some calls block a bit
60 # longer than expected
61if CHECK_TIMINGS:
62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
63else:
64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
65
66HAVE_GETVALUE = not getattr(_multiprocessing,
67 'HAVE_BROKEN_SEM_GETVALUE', False)
68
Jesse Noller6214edd2009-01-19 16:23:53 +000069WIN32 = (sys.platform == "win32")
70
Benjamin Petersone711caf2008-06-11 16:44:04 +000071#
Florent Xicluna9b0e9182010-03-28 11:42:38 +000072# Some tests require ctypes
73#
74
75try:
Florent Xiclunab4efb3d2010-08-14 18:24:40 +000076 from ctypes import Structure, c_int, c_double
Florent Xicluna9b0e9182010-03-28 11:42:38 +000077except ImportError:
78 Structure = object
79 c_int = c_double = None
80
81#
Benjamin Petersone711caf2008-06-11 16:44:04 +000082# Creates a wrapper for a function which records the time it takes to finish
83#
84
85class TimingWrapper(object):
86
87 def __init__(self, func):
88 self.func = func
89 self.elapsed = None
90
91 def __call__(self, *args, **kwds):
92 t = time.time()
93 try:
94 return self.func(*args, **kwds)
95 finally:
96 self.elapsed = time.time() - t
97
98#
99# Base class for test cases
100#
101
102class BaseTestCase(object):
103
104 ALLOWED_TYPES = ('processes', 'manager', 'threads')
105
106 def assertTimingAlmostEqual(self, a, b):
107 if CHECK_TIMINGS:
108 self.assertAlmostEqual(a, b, 1)
109
110 def assertReturnsIfImplemented(self, value, func, *args):
111 try:
112 res = func(*args)
113 except NotImplementedError:
114 pass
115 else:
116 return self.assertEqual(value, res)
117
Antoine Pitrou26899f42010-11-02 23:52:49 +0000118 # For the sanity of Windows users, rather than crashing or freezing in
119 # multiple ways.
120 def __reduce__(self, *args):
121 raise NotImplementedError("shouldn't try to pickle a test case")
122
123 __reduce_ex__ = __reduce__
124
Benjamin Petersone711caf2008-06-11 16:44:04 +0000125#
126# Return the value of a semaphore
127#
128
129def get_value(self):
130 try:
131 return self.get_value()
132 except AttributeError:
133 try:
134 return self._Semaphore__value
135 except AttributeError:
136 try:
137 return self._value
138 except AttributeError:
139 raise NotImplementedError
140
141#
142# Testcases
143#
144
145class _TestProcess(BaseTestCase):
146
147 ALLOWED_TYPES = ('processes', 'threads')
148
149 def test_current(self):
150 if self.TYPE == 'threads':
151 return
152
153 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000154 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000155
156 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000157 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 self.assertTrue(isinstance(authkey, bytes))
159 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000160 self.assertEqual(current.ident, os.getpid())
161 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000162
Antoine Pitrou26899f42010-11-02 23:52:49 +0000163 @classmethod
164 def _test(cls, q, *args, **kwds):
165 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 q.put(args)
167 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 q.put(current.name)
Antoine Pitrou26899f42010-11-02 23:52:49 +0000169 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000170 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171 q.put(current.pid)
172
173 def test_process(self):
174 q = self.Queue(1)
175 e = self.Event()
176 args = (q, 1, 2)
177 kwargs = {'hello':23, 'bye':2.54}
178 name = 'SomeProcess'
179 p = self.Process(
180 target=self._test, args=args, kwargs=kwargs, name=name
181 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000182 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000183 current = self.current_process()
184
185 if self.TYPE != 'threads':
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000186 self.assertEqual(p.authkey, current.authkey)
187 self.assertEqual(p.is_alive(), False)
188 self.assertEqual(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 self.assertTrue(p not in self.active_children())
190 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000191 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192
193 p.start()
194
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000195 self.assertEqual(p.exitcode, None)
196 self.assertEqual(p.is_alive(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197 self.assertTrue(p in self.active_children())
198
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000199 self.assertEqual(q.get(), args[1:])
200 self.assertEqual(q.get(), kwargs)
201 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202 if self.TYPE != 'threads':
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000203 self.assertEqual(q.get(), current.authkey)
204 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205
206 p.join()
207
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000208 self.assertEqual(p.exitcode, 0)
209 self.assertEqual(p.is_alive(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210 self.assertTrue(p not in self.active_children())
211
Antoine Pitrou26899f42010-11-02 23:52:49 +0000212 @classmethod
213 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000214 time.sleep(1000)
215
216 def test_terminate(self):
217 if self.TYPE == 'threads':
218 return
219
220 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000221 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 p.start()
223
224 self.assertEqual(p.is_alive(), True)
225 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227
228 p.terminate()
229
230 join = TimingWrapper(p.join)
231 self.assertEqual(join(), None)
232 self.assertTimingAlmostEqual(join.elapsed, 0.0)
233
234 self.assertEqual(p.is_alive(), False)
235 self.assertTrue(p not in self.active_children())
236
237 p.join()
238
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000239 # XXX sometimes get p.exitcode == 0 on Windows ...
240 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 def test_cpu_count(self):
243 try:
244 cpus = multiprocessing.cpu_count()
245 except NotImplementedError:
246 cpus = 1
247 self.assertTrue(type(cpus) is int)
248 self.assertTrue(cpus >= 1)
249
250 def test_active_children(self):
251 self.assertEqual(type(self.active_children()), list)
252
253 p = self.Process(target=time.sleep, args=(DELTA,))
254 self.assertTrue(p not in self.active_children())
255
256 p.start()
257 self.assertTrue(p in self.active_children())
258
259 p.join()
260 self.assertTrue(p not in self.active_children())
261
Antoine Pitrou26899f42010-11-02 23:52:49 +0000262 @classmethod
263 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264 from multiprocessing import forking
265 wconn.send(id)
266 if len(id) < 2:
267 for i in range(2):
Antoine Pitrou26899f42010-11-02 23:52:49 +0000268 p = cls.Process(
269 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 )
271 p.start()
272 p.join()
273
274 def test_recursion(self):
275 rconn, wconn = self.Pipe(duplex=False)
276 self._test_recursion(wconn, [])
277
278 time.sleep(DELTA)
279 result = []
280 while rconn.poll():
281 result.append(rconn.recv())
282
283 expected = [
284 [],
285 [0],
286 [0, 0],
287 [0, 1],
288 [1],
289 [1, 0],
290 [1, 1]
291 ]
292 self.assertEqual(result, expected)
293
294#
295#
296#
297
298class _UpperCaser(multiprocessing.Process):
299
300 def __init__(self):
301 multiprocessing.Process.__init__(self)
302 self.child_conn, self.parent_conn = multiprocessing.Pipe()
303
304 def run(self):
305 self.parent_conn.close()
306 for s in iter(self.child_conn.recv, None):
307 self.child_conn.send(s.upper())
308 self.child_conn.close()
309
310 def submit(self, s):
311 assert type(s) is str
312 self.parent_conn.send(s)
313 return self.parent_conn.recv()
314
315 def stop(self):
316 self.parent_conn.send(None)
317 self.parent_conn.close()
318 self.child_conn.close()
319
320class _TestSubclassingProcess(BaseTestCase):
321
322 ALLOWED_TYPES = ('processes',)
323
324 def test_subclassing(self):
325 uppercaser = _UpperCaser()
326 uppercaser.start()
327 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
328 self.assertEqual(uppercaser.submit('world'), 'WORLD')
329 uppercaser.stop()
330 uppercaser.join()
331
332#
333#
334#
335
336def queue_empty(q):
337 if hasattr(q, 'empty'):
338 return q.empty()
339 else:
340 return q.qsize() == 0
341
342def queue_full(q, maxsize):
343 if hasattr(q, 'full'):
344 return q.full()
345 else:
346 return q.qsize() == maxsize
347
348
349class _TestQueue(BaseTestCase):
350
351
Antoine Pitrou26899f42010-11-02 23:52:49 +0000352 @classmethod
353 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000354 child_can_start.wait()
355 for i in range(6):
356 queue.get()
357 parent_can_continue.set()
358
359 def test_put(self):
360 MAXSIZE = 6
361 queue = self.Queue(maxsize=MAXSIZE)
362 child_can_start = self.Event()
363 parent_can_continue = self.Event()
364
365 proc = self.Process(
366 target=self._test_put,
367 args=(queue, child_can_start, parent_can_continue)
368 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000369 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000370 proc.start()
371
372 self.assertEqual(queue_empty(queue), True)
373 self.assertEqual(queue_full(queue, MAXSIZE), False)
374
375 queue.put(1)
376 queue.put(2, True)
377 queue.put(3, True, None)
378 queue.put(4, False)
379 queue.put(5, False, None)
380 queue.put_nowait(6)
381
382 # the values may be in buffer but not yet in pipe so sleep a bit
383 time.sleep(DELTA)
384
385 self.assertEqual(queue_empty(queue), False)
386 self.assertEqual(queue_full(queue, MAXSIZE), True)
387
388 put = TimingWrapper(queue.put)
389 put_nowait = TimingWrapper(queue.put_nowait)
390
391 self.assertRaises(pyqueue.Full, put, 7, False)
392 self.assertTimingAlmostEqual(put.elapsed, 0)
393
394 self.assertRaises(pyqueue.Full, put, 7, False, None)
395 self.assertTimingAlmostEqual(put.elapsed, 0)
396
397 self.assertRaises(pyqueue.Full, put_nowait, 7)
398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
399
400 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
402
403 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
404 self.assertTimingAlmostEqual(put.elapsed, 0)
405
406 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 self.assertEqual(queue_empty(queue), True)
413 self.assertEqual(queue_full(queue, MAXSIZE), False)
414
415 proc.join()
416
Antoine Pitrou26899f42010-11-02 23:52:49 +0000417 @classmethod
418 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000419 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000420 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000421 queue.put(2)
422 queue.put(3)
423 queue.put(4)
424 queue.put(5)
425 parent_can_continue.set()
426
427 def test_get(self):
428 queue = self.Queue()
429 child_can_start = self.Event()
430 parent_can_continue = self.Event()
431
432 proc = self.Process(
433 target=self._test_get,
434 args=(queue, child_can_start, parent_can_continue)
435 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000436 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000437 proc.start()
438
439 self.assertEqual(queue_empty(queue), True)
440
441 child_can_start.set()
442 parent_can_continue.wait()
443
444 time.sleep(DELTA)
445 self.assertEqual(queue_empty(queue), False)
446
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000447 # Hangs unexpectedly, remove for now
448 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000449 self.assertEqual(queue.get(True, None), 2)
450 self.assertEqual(queue.get(True), 3)
451 self.assertEqual(queue.get(timeout=1), 4)
452 self.assertEqual(queue.get_nowait(), 5)
453
454 self.assertEqual(queue_empty(queue), True)
455
456 get = TimingWrapper(queue.get)
457 get_nowait = TimingWrapper(queue.get_nowait)
458
459 self.assertRaises(pyqueue.Empty, get, False)
460 self.assertTimingAlmostEqual(get.elapsed, 0)
461
462 self.assertRaises(pyqueue.Empty, get, False, None)
463 self.assertTimingAlmostEqual(get.elapsed, 0)
464
465 self.assertRaises(pyqueue.Empty, get_nowait)
466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
467
468 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
470
471 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
472 self.assertTimingAlmostEqual(get.elapsed, 0)
473
474 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
476
477 proc.join()
478
Antoine Pitrou26899f42010-11-02 23:52:49 +0000479 @classmethod
480 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000481 for i in range(10, 20):
482 queue.put(i)
483 # note that at this point the items may only be buffered, so the
484 # process cannot shutdown until the feeder thread has finished
485 # pushing items onto the pipe.
486
487 def test_fork(self):
488 # Old versions of Queue would fail to create a new feeder
489 # thread for a forked process if the original process had its
490 # own feeder thread. This test checks that this no longer
491 # happens.
492
493 queue = self.Queue()
494
495 # put items on queue so that main process starts a feeder thread
496 for i in range(10):
497 queue.put(i)
498
499 # wait to make sure thread starts before we fork a new process
500 time.sleep(DELTA)
501
502 # fork process
503 p = self.Process(target=self._test_fork, args=(queue,))
504 p.start()
505
506 # check that all expected items are in the queue
507 for i in range(20):
508 self.assertEqual(queue.get(), i)
509 self.assertRaises(pyqueue.Empty, queue.get, False)
510
511 p.join()
512
513 def test_qsize(self):
514 q = self.Queue()
515 try:
516 self.assertEqual(q.qsize(), 0)
517 except NotImplementedError:
518 return
519 q.put(1)
520 self.assertEqual(q.qsize(), 1)
521 q.put(5)
522 self.assertEqual(q.qsize(), 2)
523 q.get()
524 self.assertEqual(q.qsize(), 1)
525 q.get()
526 self.assertEqual(q.qsize(), 0)
527
Antoine Pitrou26899f42010-11-02 23:52:49 +0000528 @classmethod
529 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 for obj in iter(q.get, None):
531 time.sleep(DELTA)
532 q.task_done()
533
534 def test_task_done(self):
535 queue = self.JoinableQueue()
536
537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000538 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000539
540 workers = [self.Process(target=self._test_task_done, args=(queue,))
541 for i in range(4)]
542
543 for p in workers:
544 p.start()
545
546 for i in range(10):
547 queue.put(i)
548
549 queue.join()
550
551 for p in workers:
552 queue.put(None)
553
554 for p in workers:
555 p.join()
556
557#
558#
559#
560
561class _TestLock(BaseTestCase):
562
563 def test_lock(self):
564 lock = self.Lock()
565 self.assertEqual(lock.acquire(), True)
566 self.assertEqual(lock.acquire(False), False)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((ValueError, threading.ThreadError), lock.release)
569
570 def test_rlock(self):
571 lock = self.RLock()
572 self.assertEqual(lock.acquire(), True)
573 self.assertEqual(lock.acquire(), True)
574 self.assertEqual(lock.acquire(), True)
575 self.assertEqual(lock.release(), None)
576 self.assertEqual(lock.release(), None)
577 self.assertEqual(lock.release(), None)
578 self.assertRaises((AssertionError, RuntimeError), lock.release)
579
Jesse Nollerf8d00852009-03-31 03:25:07 +0000580 def test_lock_context(self):
581 with self.Lock():
582 pass
583
Benjamin Petersone711caf2008-06-11 16:44:04 +0000584
585class _TestSemaphore(BaseTestCase):
586
587 def _test_semaphore(self, sem):
588 self.assertReturnsIfImplemented(2, get_value, sem)
589 self.assertEqual(sem.acquire(), True)
590 self.assertReturnsIfImplemented(1, get_value, sem)
591 self.assertEqual(sem.acquire(), True)
592 self.assertReturnsIfImplemented(0, get_value, sem)
593 self.assertEqual(sem.acquire(False), False)
594 self.assertReturnsIfImplemented(0, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(1, get_value, sem)
597 self.assertEqual(sem.release(), None)
598 self.assertReturnsIfImplemented(2, get_value, sem)
599
600 def test_semaphore(self):
601 sem = self.Semaphore(2)
602 self._test_semaphore(sem)
603 self.assertEqual(sem.release(), None)
604 self.assertReturnsIfImplemented(3, get_value, sem)
605 self.assertEqual(sem.release(), None)
606 self.assertReturnsIfImplemented(4, get_value, sem)
607
608 def test_bounded_semaphore(self):
609 sem = self.BoundedSemaphore(2)
610 self._test_semaphore(sem)
611 # Currently fails on OS/X
612 #if HAVE_GETVALUE:
613 # self.assertRaises(ValueError, sem.release)
614 # self.assertReturnsIfImplemented(2, get_value, sem)
615
616 def test_timeout(self):
617 if self.TYPE != 'processes':
618 return
619
620 sem = self.Semaphore(0)
621 acquire = TimingWrapper(sem.acquire)
622
623 self.assertEqual(acquire(False), False)
624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
625
626 self.assertEqual(acquire(False, None), False)
627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
628
629 self.assertEqual(acquire(False, TIMEOUT1), False)
630 self.assertTimingAlmostEqual(acquire.elapsed, 0)
631
632 self.assertEqual(acquire(True, TIMEOUT2), False)
633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
634
635 self.assertEqual(acquire(timeout=TIMEOUT3), False)
636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
637
638
639class _TestCondition(BaseTestCase):
640
Antoine Pitrou26899f42010-11-02 23:52:49 +0000641 @classmethod
642 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000643 cond.acquire()
644 sleeping.release()
645 cond.wait(timeout)
646 woken.release()
647 cond.release()
648
649 def check_invariant(self, cond):
650 # this is only supposed to succeed when there are no sleepers
651 if self.TYPE == 'processes':
652 try:
653 sleepers = (cond._sleeping_count.get_value() -
654 cond._woken_count.get_value())
655 self.assertEqual(sleepers, 0)
656 self.assertEqual(cond._wait_semaphore.get_value(), 0)
657 except NotImplementedError:
658 pass
659
660 def test_notify(self):
661 cond = self.Condition()
662 sleeping = self.Semaphore(0)
663 woken = self.Semaphore(0)
664
665 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000666 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000667 p.start()
668
669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000670 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000671 p.start()
672
673 # wait for both children to start sleeping
674 sleeping.acquire()
675 sleeping.acquire()
676
677 # check no process/thread has woken up
678 time.sleep(DELTA)
679 self.assertReturnsIfImplemented(0, get_value, woken)
680
681 # wake up one process/thread
682 cond.acquire()
683 cond.notify()
684 cond.release()
685
686 # check one process/thread has woken up
687 time.sleep(DELTA)
688 self.assertReturnsIfImplemented(1, get_value, woken)
689
690 # wake up another
691 cond.acquire()
692 cond.notify()
693 cond.release()
694
695 # check other has woken up
696 time.sleep(DELTA)
697 self.assertReturnsIfImplemented(2, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701 p.join()
702
703 def test_notify_all(self):
704 cond = self.Condition()
705 sleeping = self.Semaphore(0)
706 woken = self.Semaphore(0)
707
708 # start some threads/processes which will timeout
709 for i in range(3):
710 p = self.Process(target=self.f,
711 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000712 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000713 p.start()
714
715 t = threading.Thread(target=self.f,
716 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000717 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000718 t.start()
719
720 # wait for them all to sleep
721 for i in range(6):
722 sleeping.acquire()
723
724 # check they have all timed out
725 for i in range(6):
726 woken.acquire()
727 self.assertReturnsIfImplemented(0, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 # start some more threads/processes
733 for i in range(3):
734 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000735 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000736 p.start()
737
738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000739 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000740 t.start()
741
742 # wait for them to all sleep
743 for i in range(6):
744 sleeping.acquire()
745
746 # check no process/thread has woken up
747 time.sleep(DELTA)
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # wake them all up
751 cond.acquire()
752 cond.notify_all()
753 cond.release()
754
755 # check they have all woken
756 time.sleep(DELTA)
757 self.assertReturnsIfImplemented(6, get_value, woken)
758
759 # check state is not mucked up
760 self.check_invariant(cond)
761
762 def test_timeout(self):
763 cond = self.Condition()
764 wait = TimingWrapper(cond.wait)
765 cond.acquire()
766 res = wait(TIMEOUT1)
767 cond.release()
768 self.assertEqual(res, None)
769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
770
771
772class _TestEvent(BaseTestCase):
773
Antoine Pitrou26899f42010-11-02 23:52:49 +0000774 @classmethod
775 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 time.sleep(TIMEOUT2)
777 event.set()
778
779 def test_event(self):
780 event = self.Event()
781 wait = TimingWrapper(event.wait)
782
Ezio Melotti13925002011-03-16 11:05:33 +0200783 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000785 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786
Benjamin Peterson965ce872009-04-05 21:24:58 +0000787 # Removed, threading.Event.wait() will return the value of the __flag
788 # instead of None. API Shear with the semaphore backed mp.Event
789 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000790 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000791 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
793
794 event.set()
795
796 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000797 self.assertEqual(event.is_set(), True)
798 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000800 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000801 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
802 # self.assertEqual(event.is_set(), True)
803
804 event.clear()
805
806 #self.assertEqual(event.is_set(), False)
807
808 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000809 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000810
811#
812#
813#
814
815class _TestValue(BaseTestCase):
816
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000817 ALLOWED_TYPES = ('processes',)
818
Benjamin Petersone711caf2008-06-11 16:44:04 +0000819 codes_values = [
820 ('i', 4343, 24234),
821 ('d', 3.625, -4.25),
822 ('h', -232, 234),
823 ('c', latin('x'), latin('y'))
824 ]
825
Antoine Pitrou72d5a9d2010-11-22 16:33:23 +0000826 def setUp(self):
827 if not HAS_SHAREDCTYPES:
828 self.skipTest("requires multiprocessing.sharedctypes")
829
Antoine Pitrou26899f42010-11-02 23:52:49 +0000830 @classmethod
831 def _test(cls, values):
832 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000833 sv.value = cv[2]
834
835
836 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 if raw:
838 values = [self.RawValue(code, value)
839 for code, value, _ in self.codes_values]
840 else:
841 values = [self.Value(code, value)
842 for code, value, _ in self.codes_values]
843
844 for sv, cv in zip(values, self.codes_values):
845 self.assertEqual(sv.value, cv[1])
846
847 proc = self.Process(target=self._test, args=(values,))
848 proc.start()
849 proc.join()
850
851 for sv, cv in zip(values, self.codes_values):
852 self.assertEqual(sv.value, cv[2])
853
854 def test_rawvalue(self):
855 self.test_value(raw=True)
856
857 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 val1 = self.Value('i', 5)
859 lock1 = val1.get_lock()
860 obj1 = val1.get_obj()
861
862 val2 = self.Value('i', 5, lock=None)
863 lock2 = val2.get_lock()
864 obj2 = val2.get_obj()
865
866 lock = self.Lock()
867 val3 = self.Value('i', 5, lock=lock)
868 lock3 = val3.get_lock()
869 obj3 = val3.get_obj()
870 self.assertEqual(lock, lock3)
871
Jesse Nollerb0516a62009-01-18 03:11:38 +0000872 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 self.assertFalse(hasattr(arr4, 'get_lock'))
874 self.assertFalse(hasattr(arr4, 'get_obj'))
875
Jesse Nollerb0516a62009-01-18 03:11:38 +0000876 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
877
878 arr5 = self.RawValue('i', 5)
879 self.assertFalse(hasattr(arr5, 'get_lock'))
880 self.assertFalse(hasattr(arr5, 'get_obj'))
881
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882
883class _TestArray(BaseTestCase):
884
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000885 ALLOWED_TYPES = ('processes',)
886
Antoine Pitrou26899f42010-11-02 23:52:49 +0000887 @classmethod
888 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000889 for i in range(1, len(seq)):
890 seq[i] += seq[i-1]
891
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000892 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000893 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000894 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
895 if raw:
896 arr = self.RawArray('i', seq)
897 else:
898 arr = self.Array('i', seq)
899
900 self.assertEqual(len(arr), len(seq))
901 self.assertEqual(arr[3], seq[3])
902 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
903
904 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
905
906 self.assertEqual(list(arr[:]), seq)
907
908 self.f(seq)
909
910 p = self.Process(target=self.f, args=(arr,))
911 p.start()
912 p.join()
913
914 self.assertEqual(list(arr[:]), seq)
915
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000917 def test_rawarray(self):
918 self.test_array(raw=True)
919
Florent Xicluna9b0e9182010-03-28 11:42:38 +0000920 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000921 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000922 arr1 = self.Array('i', list(range(10)))
923 lock1 = arr1.get_lock()
924 obj1 = arr1.get_obj()
925
926 arr2 = self.Array('i', list(range(10)), lock=None)
927 lock2 = arr2.get_lock()
928 obj2 = arr2.get_obj()
929
930 lock = self.Lock()
931 arr3 = self.Array('i', list(range(10)), lock=lock)
932 lock3 = arr3.get_lock()
933 obj3 = arr3.get_obj()
934 self.assertEqual(lock, lock3)
935
Jesse Nollerb0516a62009-01-18 03:11:38 +0000936 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 self.assertFalse(hasattr(arr4, 'get_lock'))
938 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000939 self.assertRaises(AttributeError,
940 self.Array, 'i', range(10), lock='notalock')
941
942 arr5 = self.RawArray('i', range(10))
943 self.assertFalse(hasattr(arr5, 'get_lock'))
944 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000945
946#
947#
948#
949
950class _TestContainers(BaseTestCase):
951
952 ALLOWED_TYPES = ('manager',)
953
954 def test_list(self):
955 a = self.list(list(range(10)))
956 self.assertEqual(a[:], list(range(10)))
957
958 b = self.list()
959 self.assertEqual(b[:], [])
960
961 b.extend(list(range(5)))
962 self.assertEqual(b[:], list(range(5)))
963
964 self.assertEqual(b[2], 2)
965 self.assertEqual(b[2:10], [2,3,4])
966
967 b *= 2
968 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
969
970 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
971
972 self.assertEqual(a[:], list(range(10)))
973
974 d = [a, b]
975 e = self.list(d)
976 self.assertEqual(
977 e[:],
978 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
979 )
980
981 f = self.list([a])
982 a.append('hello')
983 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
984
985 def test_dict(self):
986 d = self.dict()
987 indices = list(range(65, 70))
988 for i in indices:
989 d[i] = chr(i)
990 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
991 self.assertEqual(sorted(d.keys()), indices)
992 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
993 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
994
995 def test_namespace(self):
996 n = self.Namespace()
997 n.name = 'Bob'
998 n.job = 'Builder'
999 n._hidden = 'hidden'
1000 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1001 del n.job
1002 self.assertEqual(str(n), "Namespace(name='Bob')")
1003 self.assertTrue(hasattr(n, 'name'))
1004 self.assertTrue(not hasattr(n, 'job'))
1005
1006#
1007#
1008#
1009
1010def sqr(x, wait=0.0):
1011 time.sleep(wait)
1012 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +00001013class _TestPool(BaseTestCase):
1014
1015 def test_apply(self):
1016 papply = self.pool.apply
1017 self.assertEqual(papply(sqr, (5,)), sqr(5))
1018 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1019
1020 def test_map(self):
1021 pmap = self.pool.map
1022 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1023 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1024 list(map(sqr, list(range(100)))))
1025
Georg Brandld80344f2009-08-13 12:26:19 +00001026 def test_map_chunksize(self):
1027 try:
1028 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1029 except multiprocessing.TimeoutError:
1030 self.fail("pool.map_async with chunksize stalled on null list")
1031
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032 def test_async(self):
1033 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1034 get = TimingWrapper(res.get)
1035 self.assertEqual(get(), 49)
1036 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1037
1038 def test_async_timeout(self):
1039 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1040 get = TimingWrapper(res.get)
1041 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1042 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1043
1044 def test_imap(self):
1045 it = self.pool.imap(sqr, list(range(10)))
1046 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1047
1048 it = self.pool.imap(sqr, list(range(10)))
1049 for i in range(10):
1050 self.assertEqual(next(it), i*i)
1051 self.assertRaises(StopIteration, it.__next__)
1052
1053 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1054 for i in range(1000):
1055 self.assertEqual(next(it), i*i)
1056 self.assertRaises(StopIteration, it.__next__)
1057
1058 def test_imap_unordered(self):
1059 it = self.pool.imap_unordered(sqr, list(range(1000)))
1060 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1061
1062 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1063 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1064
1065 def test_make_pool(self):
1066 p = multiprocessing.Pool(3)
1067 self.assertEqual(3, len(p._pool))
1068 p.close()
1069 p.join()
1070
1071 def test_terminate(self):
1072 if self.TYPE == 'manager':
1073 # On Unix a forked process increfs each shared object to
1074 # which its parent process held a reference. If the
1075 # forked process gets terminated then there is likely to
1076 # be a reference leak. So to prevent
1077 # _TestZZZNumberOfObjects from failing we skip this test
1078 # when using a manager.
1079 return
1080
1081 result = self.pool.map_async(
1082 time.sleep, [0.1 for i in range(10000)], chunksize=1
1083 )
1084 self.pool.terminate()
1085 join = TimingWrapper(self.pool.join)
1086 join()
Victor Stinner29943aa2011-03-24 16:24:07 +01001087 self.assertLess(join.elapsed, 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088#
1089# Test that manager has expected number of shared objects left
1090#
1091
1092class _TestZZZNumberOfObjects(BaseTestCase):
1093 # Because test cases are sorted alphabetically, this one will get
1094 # run after all the other tests for the manager. It tests that
1095 # there have been no "reference leaks" for the manager's shared
1096 # objects. Note the comment in _TestPool.test_terminate().
1097 ALLOWED_TYPES = ('manager',)
1098
1099 def test_number_of_objects(self):
1100 EXPECTED_NUMBER = 1 # the pool object is still alive
1101 multiprocessing.active_children() # discard dead process objs
1102 gc.collect() # do garbage collection
1103 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001104 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001105 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001106 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001107 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001108
1109 self.assertEqual(refs, EXPECTED_NUMBER)
1110
1111#
1112# Test of creating a customized manager class
1113#
1114
1115from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1116
1117class FooBar(object):
1118 def f(self):
1119 return 'f()'
1120 def g(self):
1121 raise ValueError
1122 def _h(self):
1123 return '_h()'
1124
1125def baz():
1126 for i in range(10):
1127 yield i*i
1128
1129class IteratorProxy(BaseProxy):
Florent Xiclunab4efb3d2010-08-14 18:24:40 +00001130 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131 def __iter__(self):
1132 return self
1133 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001134 return self._callmethod('__next__')
1135
1136class MyManager(BaseManager):
1137 pass
1138
1139MyManager.register('Foo', callable=FooBar)
1140MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1141MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1142
1143
1144class _TestMyManager(BaseTestCase):
1145
1146 ALLOWED_TYPES = ('manager',)
1147
1148 def test_mymanager(self):
1149 manager = MyManager()
1150 manager.start()
1151
1152 foo = manager.Foo()
1153 bar = manager.Bar()
1154 baz = manager.baz()
1155
1156 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1157 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1158
1159 self.assertEqual(foo_methods, ['f', 'g'])
1160 self.assertEqual(bar_methods, ['f', '_h'])
1161
1162 self.assertEqual(foo.f(), 'f()')
1163 self.assertRaises(ValueError, foo.g)
1164 self.assertEqual(foo._callmethod('f'), 'f()')
1165 self.assertRaises(RemoteError, foo._callmethod, '_h')
1166
1167 self.assertEqual(bar.f(), 'f()')
1168 self.assertEqual(bar._h(), '_h()')
1169 self.assertEqual(bar._callmethod('f'), 'f()')
1170 self.assertEqual(bar._callmethod('_h'), '_h()')
1171
1172 self.assertEqual(list(baz), [i*i for i in range(10)])
1173
1174 manager.shutdown()
1175
1176#
1177# Test of connecting to a remote server and using xmlrpclib for serialization
1178#
1179
1180_queue = pyqueue.Queue()
1181def get_queue():
1182 return _queue
1183
1184class QueueManager(BaseManager):
1185 '''manager class used by server process'''
1186QueueManager.register('get_queue', callable=get_queue)
1187
1188class QueueManager2(BaseManager):
1189 '''manager class which specifies the same interface as QueueManager'''
1190QueueManager2.register('get_queue')
1191
1192
1193SERIALIZER = 'xmlrpclib'
1194
1195class _TestRemoteManager(BaseTestCase):
1196
1197 ALLOWED_TYPES = ('manager',)
1198
Antoine Pitrou26899f42010-11-02 23:52:49 +00001199 @classmethod
1200 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001201 manager = QueueManager2(
1202 address=address, authkey=authkey, serializer=SERIALIZER
1203 )
1204 manager.connect()
1205 queue = manager.get_queue()
1206 queue.put(('hello world', None, True, 2.25))
1207
1208 def test_remote(self):
1209 authkey = os.urandom(32)
1210
1211 manager = QueueManager(
1212 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1213 )
1214 manager.start()
1215
1216 p = self.Process(target=self._putter, args=(manager.address, authkey))
1217 p.start()
1218
1219 manager2 = QueueManager2(
1220 address=manager.address, authkey=authkey, serializer=SERIALIZER
1221 )
1222 manager2.connect()
1223 queue = manager2.get_queue()
1224
1225 # Note that xmlrpclib will deserialize object as a list not a tuple
1226 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1227
1228 # Because we are using xmlrpclib for serialization instead of
1229 # pickle this will cause a serialization error.
1230 self.assertRaises(Exception, queue.put, time.sleep)
1231
1232 # Make queue finalizer run before the server is stopped
1233 del queue
1234 manager.shutdown()
1235
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001236class _TestManagerRestart(BaseTestCase):
1237
Antoine Pitrou26899f42010-11-02 23:52:49 +00001238 @classmethod
1239 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001240 manager = QueueManager(
1241 address=address, authkey=authkey, serializer=SERIALIZER)
1242 manager.connect()
1243 queue = manager.get_queue()
1244 queue.put('hello world')
1245
1246 def test_rapid_restart(self):
1247 authkey = os.urandom(32)
1248 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001249 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin9e2fadc2010-11-01 05:12:34 +00001250 srvr = manager.get_server()
1251 addr = srvr.address
1252 # Close the connection.Listener socket which gets opened as a part
1253 # of manager.get_server(). It's not needed for the test.
1254 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001255 manager.start()
1256
1257 p = self.Process(target=self._putter, args=(manager.address, authkey))
1258 p.start()
1259 queue = manager.get_queue()
1260 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001261 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001262 manager.shutdown()
1263 manager = QueueManager(
Antoine Pitroua751c3f2010-04-30 23:23:38 +00001264 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001265 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001266 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001267
Benjamin Petersone711caf2008-06-11 16:44:04 +00001268#
1269#
1270#
1271
1272SENTINEL = latin('')
1273
1274class _TestConnection(BaseTestCase):
1275
1276 ALLOWED_TYPES = ('processes', 'threads')
1277
Antoine Pitrou26899f42010-11-02 23:52:49 +00001278 @classmethod
1279 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001280 for msg in iter(conn.recv_bytes, SENTINEL):
1281 conn.send_bytes(msg)
1282 conn.close()
1283
1284 def test_connection(self):
1285 conn, child_conn = self.Pipe()
1286
1287 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001288 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001289 p.start()
1290
1291 seq = [1, 2.25, None]
1292 msg = latin('hello world')
1293 longmsg = msg * 10
1294 arr = array.array('i', list(range(4)))
1295
1296 if self.TYPE == 'processes':
1297 self.assertEqual(type(conn.fileno()), int)
1298
1299 self.assertEqual(conn.send(seq), None)
1300 self.assertEqual(conn.recv(), seq)
1301
1302 self.assertEqual(conn.send_bytes(msg), None)
1303 self.assertEqual(conn.recv_bytes(), msg)
1304
1305 if self.TYPE == 'processes':
1306 buffer = array.array('i', [0]*10)
1307 expected = list(arr) + [0] * (10 - len(arr))
1308 self.assertEqual(conn.send_bytes(arr), None)
1309 self.assertEqual(conn.recv_bytes_into(buffer),
1310 len(arr) * buffer.itemsize)
1311 self.assertEqual(list(buffer), expected)
1312
1313 buffer = array.array('i', [0]*10)
1314 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1315 self.assertEqual(conn.send_bytes(arr), None)
1316 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1317 len(arr) * buffer.itemsize)
1318 self.assertEqual(list(buffer), expected)
1319
1320 buffer = bytearray(latin(' ' * 40))
1321 self.assertEqual(conn.send_bytes(longmsg), None)
1322 try:
1323 res = conn.recv_bytes_into(buffer)
1324 except multiprocessing.BufferTooShort as e:
1325 self.assertEqual(e.args, (longmsg,))
1326 else:
1327 self.fail('expected BufferTooShort, got %s' % res)
1328
1329 poll = TimingWrapper(conn.poll)
1330
1331 self.assertEqual(poll(), False)
1332 self.assertTimingAlmostEqual(poll.elapsed, 0)
1333
1334 self.assertEqual(poll(TIMEOUT1), False)
1335 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1336
1337 conn.send(None)
1338
1339 self.assertEqual(poll(TIMEOUT1), True)
1340 self.assertTimingAlmostEqual(poll.elapsed, 0)
1341
1342 self.assertEqual(conn.recv(), None)
1343
1344 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1345 conn.send_bytes(really_big_msg)
1346 self.assertEqual(conn.recv_bytes(), really_big_msg)
1347
1348 conn.send_bytes(SENTINEL) # tell child to quit
1349 child_conn.close()
1350
1351 if self.TYPE == 'processes':
1352 self.assertEqual(conn.readable, True)
1353 self.assertEqual(conn.writable, True)
1354 self.assertRaises(EOFError, conn.recv)
1355 self.assertRaises(EOFError, conn.recv_bytes)
1356
1357 p.join()
1358
1359 def test_duplex_false(self):
1360 reader, writer = self.Pipe(duplex=False)
1361 self.assertEqual(writer.send(1), None)
1362 self.assertEqual(reader.recv(), 1)
1363 if self.TYPE == 'processes':
1364 self.assertEqual(reader.readable, True)
1365 self.assertEqual(reader.writable, False)
1366 self.assertEqual(writer.readable, False)
1367 self.assertEqual(writer.writable, True)
1368 self.assertRaises(IOError, reader.send, 2)
1369 self.assertRaises(IOError, writer.recv)
1370 self.assertRaises(IOError, writer.poll)
1371
1372 def test_spawn_close(self):
1373 # We test that a pipe connection can be closed by parent
1374 # process immediately after child is spawned. On Windows this
1375 # would have sometimes failed on old versions because
1376 # child_conn would be closed before the child got a chance to
1377 # duplicate it.
1378 conn, child_conn = self.Pipe()
1379
1380 p = self.Process(target=self._echo, args=(child_conn,))
1381 p.start()
1382 child_conn.close() # this might complete before child initializes
1383
1384 msg = latin('hello')
1385 conn.send_bytes(msg)
1386 self.assertEqual(conn.recv_bytes(), msg)
1387
1388 conn.send_bytes(SENTINEL)
1389 conn.close()
1390 p.join()
1391
1392 def test_sendbytes(self):
1393 if self.TYPE != 'processes':
1394 return
1395
1396 msg = latin('abcdefghijklmnopqrstuvwxyz')
1397 a, b = self.Pipe()
1398
1399 a.send_bytes(msg)
1400 self.assertEqual(b.recv_bytes(), msg)
1401
1402 a.send_bytes(msg, 5)
1403 self.assertEqual(b.recv_bytes(), msg[5:])
1404
1405 a.send_bytes(msg, 7, 8)
1406 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1407
1408 a.send_bytes(msg, 26)
1409 self.assertEqual(b.recv_bytes(), latin(''))
1410
1411 a.send_bytes(msg, 26, 0)
1412 self.assertEqual(b.recv_bytes(), latin(''))
1413
1414 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1415
1416 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1417
1418 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1419
1420 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1421
1422 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1423
Benjamin Petersone711caf2008-06-11 16:44:04 +00001424class _TestListenerClient(BaseTestCase):
1425
1426 ALLOWED_TYPES = ('processes', 'threads')
1427
Antoine Pitrou26899f42010-11-02 23:52:49 +00001428 @classmethod
1429 def _test(cls, address):
1430 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001431 conn.send('hello')
1432 conn.close()
1433
1434 def test_listener_client(self):
1435 for family in self.connection.families:
1436 l = self.connection.Listener(family=family)
1437 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001438 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001439 p.start()
1440 conn = l.accept()
1441 self.assertEqual(conn.recv(), 'hello')
1442 p.join()
1443 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001444#
1445# Test of sending connection and socket objects between processes
1446#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001447"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001448class _TestPicklingConnections(BaseTestCase):
1449
1450 ALLOWED_TYPES = ('processes',)
1451
1452 def _listener(self, conn, families):
1453 for fam in families:
1454 l = self.connection.Listener(family=fam)
1455 conn.send(l.address)
1456 new_conn = l.accept()
1457 conn.send(new_conn)
1458
1459 if self.TYPE == 'processes':
1460 l = socket.socket()
1461 l.bind(('localhost', 0))
1462 conn.send(l.getsockname())
1463 l.listen(1)
1464 new_conn, addr = l.accept()
1465 conn.send(new_conn)
1466
1467 conn.recv()
1468
1469 def _remote(self, conn):
1470 for (address, msg) in iter(conn.recv, None):
1471 client = self.connection.Client(address)
1472 client.send(msg.upper())
1473 client.close()
1474
1475 if self.TYPE == 'processes':
1476 address, msg = conn.recv()
1477 client = socket.socket()
1478 client.connect(address)
1479 client.sendall(msg.upper())
1480 client.close()
1481
1482 conn.close()
1483
1484 def test_pickling(self):
1485 try:
1486 multiprocessing.allow_connection_pickling()
1487 except ImportError:
1488 return
1489
1490 families = self.connection.families
1491
1492 lconn, lconn0 = self.Pipe()
1493 lp = self.Process(target=self._listener, args=(lconn0, families))
1494 lp.start()
1495 lconn0.close()
1496
1497 rconn, rconn0 = self.Pipe()
1498 rp = self.Process(target=self._remote, args=(rconn0,))
1499 rp.start()
1500 rconn0.close()
1501
1502 for fam in families:
1503 msg = ('This connection uses family %s' % fam).encode('ascii')
1504 address = lconn.recv()
1505 rconn.send((address, msg))
1506 new_conn = lconn.recv()
1507 self.assertEqual(new_conn.recv(), msg.upper())
1508
1509 rconn.send(None)
1510
1511 if self.TYPE == 'processes':
1512 msg = latin('This connection uses a normal socket')
1513 address = lconn.recv()
1514 rconn.send((address, msg))
1515 if hasattr(socket, 'fromfd'):
1516 new_conn = lconn.recv()
1517 self.assertEqual(new_conn.recv(100), msg.upper())
1518 else:
1519 # XXX On Windows with Py2.6 need to backport fromfd()
1520 discard = lconn.recv_bytes()
1521
1522 lconn.send(None)
1523
1524 rconn.close()
1525 lconn.close()
1526
1527 lp.join()
1528 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001529"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001530#
1531#
1532#
1533
1534class _TestHeap(BaseTestCase):
1535
1536 ALLOWED_TYPES = ('processes',)
1537
1538 def test_heap(self):
1539 iterations = 5000
1540 maxblocks = 50
1541 blocks = []
1542
1543 # create and destroy lots of blocks of different sizes
1544 for i in range(iterations):
1545 size = int(random.lognormvariate(0, 1) * 1000)
1546 b = multiprocessing.heap.BufferWrapper(size)
1547 blocks.append(b)
1548 if len(blocks) > maxblocks:
1549 i = random.randrange(maxblocks)
1550 del blocks[i]
1551
1552 # get the heap object
1553 heap = multiprocessing.heap.BufferWrapper._heap
1554
1555 # verify the state of the heap
1556 all = []
1557 occupied = 0
1558 for L in list(heap._len_to_seq.values()):
1559 for arena, start, stop in L:
1560 all.append((heap._arenas.index(arena), start, stop,
1561 stop-start, 'free'))
1562 for arena, start, stop in heap._allocated_blocks:
1563 all.append((heap._arenas.index(arena), start, stop,
1564 stop-start, 'occupied'))
1565 occupied += (stop-start)
1566
1567 all.sort()
1568
1569 for i in range(len(all)-1):
1570 (arena, start, stop) = all[i][:3]
1571 (narena, nstart, nstop) = all[i+1][:3]
1572 self.assertTrue((arena != narena and nstart == 0) or
1573 (stop == nstart))
1574
1575#
1576#
1577#
1578
Benjamin Petersone711caf2008-06-11 16:44:04 +00001579class _Foo(Structure):
1580 _fields_ = [
1581 ('x', c_int),
1582 ('y', c_double)
1583 ]
1584
1585class _TestSharedCTypes(BaseTestCase):
1586
1587 ALLOWED_TYPES = ('processes',)
1588
Antoine Pitrou72d5a9d2010-11-22 16:33:23 +00001589 def setUp(self):
1590 if not HAS_SHAREDCTYPES:
1591 self.skipTest("requires multiprocessing.sharedctypes")
1592
Antoine Pitrou26899f42010-11-02 23:52:49 +00001593 @classmethod
1594 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001595 x.value *= 2
1596 y.value *= 2
1597 foo.x *= 2
1598 foo.y *= 2
1599 string.value *= 2
1600 for i in range(len(arr)):
1601 arr[i] *= 2
1602
1603 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001604 x = Value('i', 7, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001605 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001606 foo = Value(_Foo, 3, 2, lock=lock)
Brian Curtin918616c2010-10-07 02:12:17 +00001607 arr = self.Array('d', list(range(10)), lock=lock)
1608 string = self.Array('c', 20, lock=lock)
1609 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001610
1611 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1612 p.start()
1613 p.join()
1614
1615 self.assertEqual(x.value, 14)
1616 self.assertAlmostEqual(y.value, 2.0/3.0)
1617 self.assertEqual(foo.x, 6)
1618 self.assertAlmostEqual(foo.y, 4.0)
1619 for i in range(10):
1620 self.assertAlmostEqual(arr[i], i*2)
1621 self.assertEqual(string.value, latin('hellohello'))
1622
1623 def test_synchronize(self):
1624 self.test_sharedctypes(lock=True)
1625
1626 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001627 foo = _Foo(2, 5.0)
Brian Curtin918616c2010-10-07 02:12:17 +00001628 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001629 foo.x = 0
1630 foo.y = 0
1631 self.assertEqual(bar.x, 2)
1632 self.assertAlmostEqual(bar.y, 5.0)
1633
1634#
1635#
1636#
1637
1638class _TestFinalize(BaseTestCase):
1639
1640 ALLOWED_TYPES = ('processes',)
1641
Antoine Pitrou26899f42010-11-02 23:52:49 +00001642 @classmethod
1643 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001644 class Foo(object):
1645 pass
1646
1647 a = Foo()
1648 util.Finalize(a, conn.send, args=('a',))
1649 del a # triggers callback for a
1650
1651 b = Foo()
1652 close_b = util.Finalize(b, conn.send, args=('b',))
1653 close_b() # triggers callback for b
1654 close_b() # does nothing because callback has already been called
1655 del b # does nothing because callback has already been called
1656
1657 c = Foo()
1658 util.Finalize(c, conn.send, args=('c',))
1659
1660 d10 = Foo()
1661 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1662
1663 d01 = Foo()
1664 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1665 d02 = Foo()
1666 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1667 d03 = Foo()
1668 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1669
1670 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1671
1672 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1673
Ezio Melotti13925002011-03-16 11:05:33 +02001674 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001675 # garbage collecting locals
1676 util._exit_function()
1677 conn.close()
1678 os._exit(0)
1679
1680 def test_finalize(self):
1681 conn, child_conn = self.Pipe()
1682
1683 p = self.Process(target=self._test_finalize, args=(child_conn,))
1684 p.start()
1685 p.join()
1686
1687 result = [obj for obj in iter(conn.recv, 'STOP')]
1688 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1689
1690#
1691# Test that from ... import * works for each module
1692#
1693
1694class _TestImportStar(BaseTestCase):
1695
1696 ALLOWED_TYPES = ('processes',)
1697
1698 def test_import(self):
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001699 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001700 'multiprocessing', 'multiprocessing.connection',
1701 'multiprocessing.heap', 'multiprocessing.managers',
1702 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001703 'multiprocessing.reduction',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001704 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001705 ]
1706
1707 if c_int is not None:
1708 # This module requires _ctypes
1709 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001710
1711 for name in modules:
1712 __import__(name)
1713 mod = sys.modules[name]
1714
1715 for attr in getattr(mod, '__all__', ()):
1716 self.assertTrue(
1717 hasattr(mod, attr),
1718 '%r does not have attribute %r' % (mod, attr)
1719 )
1720
1721#
1722# Quick test that logging works -- does not test logging output
1723#
1724
1725class _TestLogging(BaseTestCase):
1726
1727 ALLOWED_TYPES = ('processes',)
1728
1729 def test_enable_logging(self):
1730 logger = multiprocessing.get_logger()
1731 logger.setLevel(util.SUBWARNING)
1732 self.assertTrue(logger is not None)
1733 logger.debug('this will not be printed')
1734 logger.info('nor will this')
1735 logger.setLevel(LOG_LEVEL)
1736
Antoine Pitrou26899f42010-11-02 23:52:49 +00001737 @classmethod
1738 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001739 logger = multiprocessing.get_logger()
1740 conn.send(logger.getEffectiveLevel())
1741
1742 def test_level(self):
1743 LEVEL1 = 32
1744 LEVEL2 = 37
1745
1746 logger = multiprocessing.get_logger()
1747 root_logger = logging.getLogger()
1748 root_level = root_logger.level
1749
1750 reader, writer = multiprocessing.Pipe(duplex=False)
1751
1752 logger.setLevel(LEVEL1)
1753 self.Process(target=self._test_level, args=(writer,)).start()
1754 self.assertEqual(LEVEL1, reader.recv())
1755
1756 logger.setLevel(logging.NOTSET)
1757 root_logger.setLevel(LEVEL2)
1758 self.Process(target=self._test_level, args=(writer,)).start()
1759 self.assertEqual(LEVEL2, reader.recv())
1760
1761 root_logger.setLevel(root_level)
1762 logger.setLevel(level=LOG_LEVEL)
1763
1764#
Jesse Noller6214edd2009-01-19 16:23:53 +00001765# Test to verify handle verification, see issue 3321
1766#
1767
1768class TestInvalidHandle(unittest.TestCase):
1769
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001770 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00001771 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00001772 conn = _multiprocessing.Connection(44977608)
1773 self.assertRaises(IOError, conn.poll)
1774 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001775
Jesse Noller6214edd2009-01-19 16:23:53 +00001776#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001777# Functions used to create test cases from the base ones in this module
1778#
1779
1780def get_attributes(Source, names):
1781 d = {}
1782 for name in names:
1783 obj = getattr(Source, name)
1784 if type(obj) == type(get_attributes):
1785 obj = staticmethod(obj)
1786 d[name] = obj
1787 return d
1788
1789def create_test_cases(Mixin, type):
1790 result = {}
1791 glob = globals()
Florent Xicluna9b0e9182010-03-28 11:42:38 +00001792 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001793
1794 for name in list(glob.keys()):
1795 if name.startswith('_Test'):
1796 base = glob[name]
1797 if type in base.ALLOWED_TYPES:
1798 newname = 'With' + Type + name[1:]
1799 class Temp(base, unittest.TestCase, Mixin):
1800 pass
1801 result[newname] = Temp
1802 Temp.__name__ = newname
1803 Temp.__module__ = Mixin.__module__
1804 return result
1805
1806#
1807# Create test cases
1808#
1809
1810class ProcessesMixin(object):
1811 TYPE = 'processes'
1812 Process = multiprocessing.Process
1813 locals().update(get_attributes(multiprocessing, (
1814 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1815 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1816 'RawArray', 'current_process', 'active_children', 'Pipe',
1817 'connection', 'JoinableQueue'
1818 )))
1819
1820testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1821globals().update(testcases_processes)
1822
1823
1824class ManagerMixin(object):
1825 TYPE = 'manager'
1826 Process = multiprocessing.Process
1827 manager = object.__new__(multiprocessing.managers.SyncManager)
1828 locals().update(get_attributes(manager, (
1829 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1830 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1831 'Namespace', 'JoinableQueue'
1832 )))
1833
1834testcases_manager = create_test_cases(ManagerMixin, type='manager')
1835globals().update(testcases_manager)
1836
1837
1838class ThreadsMixin(object):
1839 TYPE = 'threads'
1840 Process = multiprocessing.dummy.Process
1841 locals().update(get_attributes(multiprocessing.dummy, (
1842 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1843 'Condition', 'Event', 'Value', 'Array', 'current_process',
1844 'active_children', 'Pipe', 'connection', 'dict', 'list',
1845 'Namespace', 'JoinableQueue'
1846 )))
1847
1848testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1849globals().update(testcases_threads)
1850
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001851class OtherTest(unittest.TestCase):
1852 # TODO: add more tests for deliver/answer challenge.
1853 def test_deliver_challenge_auth_failure(self):
1854 class _FakeConnection(object):
1855 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001856 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001857 def send_bytes(self, data):
1858 pass
1859 self.assertRaises(multiprocessing.AuthenticationError,
1860 multiprocessing.connection.deliver_challenge,
1861 _FakeConnection(), b'abc')
1862
1863 def test_answer_challenge_auth_failure(self):
1864 class _FakeConnection(object):
1865 def __init__(self):
1866 self.count = 0
1867 def recv_bytes(self, size):
1868 self.count += 1
1869 if self.count == 1:
1870 return multiprocessing.connection.CHALLENGE
1871 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001872 return b'something bogus'
1873 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001874 def send_bytes(self, data):
1875 pass
1876 self.assertRaises(multiprocessing.AuthenticationError,
1877 multiprocessing.connection.answer_challenge,
1878 _FakeConnection(), b'abc')
1879
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001880#
1881# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1882#
1883
1884def initializer(ns):
1885 ns.test += 1
1886
1887class TestInitializers(unittest.TestCase):
1888 def setUp(self):
1889 self.mgr = multiprocessing.Manager()
1890 self.ns = self.mgr.Namespace()
1891 self.ns.test = 0
1892
1893 def tearDown(self):
1894 self.mgr.shutdown()
1895
1896 def test_manager_initializer(self):
1897 m = multiprocessing.managers.SyncManager()
1898 self.assertRaises(TypeError, m.start, 1)
1899 m.start(initializer, (self.ns,))
1900 self.assertEqual(self.ns.test, 1)
1901 m.shutdown()
1902
1903 def test_pool_initializer(self):
1904 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1905 p = multiprocessing.Pool(1, initializer, (self.ns,))
1906 p.close()
1907 p.join()
1908 self.assertEqual(self.ns.test, 1)
1909
R. David Murraya44c6b32009-07-29 15:40:30 +00001910#
1911# Issue 5155, 5313, 5331: Test process in processes
1912# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1913#
1914
1915def _ThisSubProcess(q):
1916 try:
1917 item = q.get(block=False)
1918 except pyqueue.Empty:
1919 pass
1920
1921def _TestProcess(q):
1922 queue = multiprocessing.Queue()
1923 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1924 subProc.start()
1925 subProc.join()
1926
1927def _afunc(x):
1928 return x*x
1929
1930def pool_in_process():
1931 pool = multiprocessing.Pool(processes=4)
1932 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1933
1934class _file_like(object):
1935 def __init__(self, delegate):
1936 self._delegate = delegate
1937 self._pid = None
1938
1939 @property
1940 def cache(self):
1941 pid = os.getpid()
1942 # There are no race conditions since fork keeps only the running thread
1943 if pid != self._pid:
1944 self._pid = pid
1945 self._cache = []
1946 return self._cache
1947
1948 def write(self, data):
1949 self.cache.append(data)
1950
1951 def flush(self):
1952 self._delegate.write(''.join(self.cache))
1953 self._cache = []
1954
1955class TestStdinBadfiledescriptor(unittest.TestCase):
1956
1957 def test_queue_in_process(self):
1958 queue = multiprocessing.Queue()
1959 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1960 proc.start()
1961 proc.join()
1962
1963 def test_pool_in_process(self):
1964 p = multiprocessing.Process(target=pool_in_process)
1965 p.start()
1966 p.join()
1967
1968 def test_flushing(self):
1969 sio = io.StringIO()
1970 flike = _file_like(sio)
1971 flike.write('foo')
1972 proc = multiprocessing.Process(target=lambda: flike.flush())
1973 flike.flush()
1974 assert sio.getvalue() == 'foo'
1975
1976testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1977 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001978
Benjamin Petersone711caf2008-06-11 16:44:04 +00001979#
1980#
1981#
1982
1983def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001984 if sys.platform.startswith("linux"):
1985 try:
1986 lock = multiprocessing.RLock()
1987 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00001988 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001989
Benjamin Petersone711caf2008-06-11 16:44:04 +00001990 if run is None:
1991 from test.support import run_unittest as run
1992
1993 util.get_temp_dir() # creates temp directory for use by all processes
1994
1995 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1996
Benjamin Peterson41181742008-07-02 20:22:54 +00001997 ProcessesMixin.pool = multiprocessing.Pool(4)
1998 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1999 ManagerMixin.manager.__init__()
2000 ManagerMixin.manager.start()
2001 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002002
2003 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002004 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2005 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002006 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2007 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002008 )
2009
2010 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2011 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2012 run(suite)
2013
Benjamin Peterson41181742008-07-02 20:22:54 +00002014 ThreadsMixin.pool.terminate()
2015 ProcessesMixin.pool.terminate()
2016 ManagerMixin.pool.terminate()
2017 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002018
Benjamin Peterson41181742008-07-02 20:22:54 +00002019 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002020
2021def main():
2022 test_main(unittest.TextTestRunner(verbosity=2).run)
2023
2024if __name__ == '__main__':
2025 main()