blob: 59b3357eefb7a4eb22a867a6ef1bba9e3296b36e [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000018from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000019from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000020_multiprocessing = test_support.import_module('_multiprocessing')
Victor Stinner613b4cf2010-04-27 21:56:26 +000021# import threading after _multiprocessing to raise a more revelant error
22# message: "No module named _multiprocessing". _multiprocessing is not compiled
23# without thread support.
24import threading
R. David Murray3db8a342009-03-30 23:05:48 +000025
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Petersone79edf52008-07-13 18:34:58 +000041latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000048#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000049
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersondfd79492008-06-13 19:13:39 +000065#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000066# Some tests require ctypes
67#
68
69try:
Nick Coghlan13623662010-04-10 14:24:36 +000070 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000071except ImportError:
72 Structure = object
73 c_int = c_double = None
74
Nick Coghlan13623662010-04-10 14:24:36 +000075try:
76 from ctypes import Value
77except ImportError:
78 Value = None
79
80try:
81 from ctypes import copy as ctypes_copy
82except ImportError:
83 ctypes_copy = None
84
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000085#
Benjamin Petersondfd79492008-06-13 19:13:39 +000086# Creates a wrapper for a function which records the time it takes to finish
87#
88
89class TimingWrapper(object):
90
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
94
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
101
102#
103# Base class for test cases
104#
105
106class BaseTestCase(object):
107
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
109
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
113
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
121
122#
123# Return the value of a semaphore
124#
125
126def get_value(self):
127 try:
128 return self.get_value()
129 except AttributeError:
130 try:
131 return self._Semaphore__value
132 except AttributeError:
133 try:
134 return self._value
135 except AttributeError:
136 raise NotImplementedError
137
138#
139# Testcases
140#
141
142class _TestProcess(BaseTestCase):
143
144 ALLOWED_TYPES = ('processes', 'threads')
145
146 def test_current(self):
147 if self.TYPE == 'threads':
148 return
149
150 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000151 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000152
153 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000154 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000155 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000156 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000157 self.assertEqual(current.ident, os.getpid())
158 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159
160 def _test(self, q, *args, **kwds):
161 current = self.current_process()
162 q.put(args)
163 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000164 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000165 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000166 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000167 q.put(current.pid)
168
169 def test_process(self):
170 q = self.Queue(1)
171 e = self.Event()
172 args = (q, 1, 2)
173 kwargs = {'hello':23, 'bye':2.54}
174 name = 'SomeProcess'
175 p = self.Process(
176 target=self._test, args=args, kwargs=kwargs, name=name
177 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000178 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179 current = self.current_process()
180
181 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000182 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000184 self.assertEquals(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000185 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000186 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 p.start()
190
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertEquals(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000193 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
195 self.assertEquals(q.get(), args[1:])
196 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000197 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000199 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000200 self.assertEquals(q.get(), p.pid)
201
202 p.join()
203
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000204 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205 self.assertEquals(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000206 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000207
208 def _test_terminate(self):
209 time.sleep(1000)
210
211 def test_terminate(self):
212 if self.TYPE == 'threads':
213 return
214
215 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000216 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000217 p.start()
218
219 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000220 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222
223 p.terminate()
224
225 join = TimingWrapper(p.join)
226 self.assertEqual(join(), None)
227 self.assertTimingAlmostEqual(join.elapsed, 0.0)
228
229 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
232 p.join()
233
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000234 # XXX sometimes get p.exitcode == 0 on Windows ...
235 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236
237 def test_cpu_count(self):
238 try:
239 cpus = multiprocessing.cpu_count()
240 except NotImplementedError:
241 cpus = 1
242 self.assertTrue(type(cpus) is int)
243 self.assertTrue(cpus >= 1)
244
245 def test_active_children(self):
246 self.assertEqual(type(self.active_children()), list)
247
248 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000250
251 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000252 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000253
254 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000256
257 def _test_recursion(self, wconn, id):
258 from multiprocessing import forking
259 wconn.send(id)
260 if len(id) < 2:
261 for i in range(2):
262 p = self.Process(
263 target=self._test_recursion, args=(wconn, id+[i])
264 )
265 p.start()
266 p.join()
267
268 def test_recursion(self):
269 rconn, wconn = self.Pipe(duplex=False)
270 self._test_recursion(wconn, [])
271
272 time.sleep(DELTA)
273 result = []
274 while rconn.poll():
275 result.append(rconn.recv())
276
277 expected = [
278 [],
279 [0],
280 [0, 0],
281 [0, 1],
282 [1],
283 [1, 0],
284 [1, 1]
285 ]
286 self.assertEqual(result, expected)
287
288#
289#
290#
291
292class _UpperCaser(multiprocessing.Process):
293
294 def __init__(self):
295 multiprocessing.Process.__init__(self)
296 self.child_conn, self.parent_conn = multiprocessing.Pipe()
297
298 def run(self):
299 self.parent_conn.close()
300 for s in iter(self.child_conn.recv, None):
301 self.child_conn.send(s.upper())
302 self.child_conn.close()
303
304 def submit(self, s):
305 assert type(s) is str
306 self.parent_conn.send(s)
307 return self.parent_conn.recv()
308
309 def stop(self):
310 self.parent_conn.send(None)
311 self.parent_conn.close()
312 self.child_conn.close()
313
314class _TestSubclassingProcess(BaseTestCase):
315
316 ALLOWED_TYPES = ('processes',)
317
318 def test_subclassing(self):
319 uppercaser = _UpperCaser()
320 uppercaser.start()
321 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
322 self.assertEqual(uppercaser.submit('world'), 'WORLD')
323 uppercaser.stop()
324 uppercaser.join()
325
326#
327#
328#
329
330def queue_empty(q):
331 if hasattr(q, 'empty'):
332 return q.empty()
333 else:
334 return q.qsize() == 0
335
336def queue_full(q, maxsize):
337 if hasattr(q, 'full'):
338 return q.full()
339 else:
340 return q.qsize() == maxsize
341
342
343class _TestQueue(BaseTestCase):
344
345
346 def _test_put(self, queue, child_can_start, parent_can_continue):
347 child_can_start.wait()
348 for i in range(6):
349 queue.get()
350 parent_can_continue.set()
351
352 def test_put(self):
353 MAXSIZE = 6
354 queue = self.Queue(maxsize=MAXSIZE)
355 child_can_start = self.Event()
356 parent_can_continue = self.Event()
357
358 proc = self.Process(
359 target=self._test_put,
360 args=(queue, child_can_start, parent_can_continue)
361 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000362 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000363 proc.start()
364
365 self.assertEqual(queue_empty(queue), True)
366 self.assertEqual(queue_full(queue, MAXSIZE), False)
367
368 queue.put(1)
369 queue.put(2, True)
370 queue.put(3, True, None)
371 queue.put(4, False)
372 queue.put(5, False, None)
373 queue.put_nowait(6)
374
375 # the values may be in buffer but not yet in pipe so sleep a bit
376 time.sleep(DELTA)
377
378 self.assertEqual(queue_empty(queue), False)
379 self.assertEqual(queue_full(queue, MAXSIZE), True)
380
381 put = TimingWrapper(queue.put)
382 put_nowait = TimingWrapper(queue.put_nowait)
383
384 self.assertRaises(Queue.Full, put, 7, False)
385 self.assertTimingAlmostEqual(put.elapsed, 0)
386
387 self.assertRaises(Queue.Full, put, 7, False, None)
388 self.assertTimingAlmostEqual(put.elapsed, 0)
389
390 self.assertRaises(Queue.Full, put_nowait, 7)
391 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
392
393 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
394 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
395
396 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
397 self.assertTimingAlmostEqual(put.elapsed, 0)
398
399 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
400 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
401
402 child_can_start.set()
403 parent_can_continue.wait()
404
405 self.assertEqual(queue_empty(queue), True)
406 self.assertEqual(queue_full(queue, MAXSIZE), False)
407
408 proc.join()
409
410 def _test_get(self, queue, child_can_start, parent_can_continue):
411 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000412 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000413 queue.put(2)
414 queue.put(3)
415 queue.put(4)
416 queue.put(5)
417 parent_can_continue.set()
418
419 def test_get(self):
420 queue = self.Queue()
421 child_can_start = self.Event()
422 parent_can_continue = self.Event()
423
424 proc = self.Process(
425 target=self._test_get,
426 args=(queue, child_can_start, parent_can_continue)
427 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000428 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000429 proc.start()
430
431 self.assertEqual(queue_empty(queue), True)
432
433 child_can_start.set()
434 parent_can_continue.wait()
435
436 time.sleep(DELTA)
437 self.assertEqual(queue_empty(queue), False)
438
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000439 # Hangs unexpectedly, remove for now
440 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000441 self.assertEqual(queue.get(True, None), 2)
442 self.assertEqual(queue.get(True), 3)
443 self.assertEqual(queue.get(timeout=1), 4)
444 self.assertEqual(queue.get_nowait(), 5)
445
446 self.assertEqual(queue_empty(queue), True)
447
448 get = TimingWrapper(queue.get)
449 get_nowait = TimingWrapper(queue.get_nowait)
450
451 self.assertRaises(Queue.Empty, get, False)
452 self.assertTimingAlmostEqual(get.elapsed, 0)
453
454 self.assertRaises(Queue.Empty, get, False, None)
455 self.assertTimingAlmostEqual(get.elapsed, 0)
456
457 self.assertRaises(Queue.Empty, get_nowait)
458 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
459
460 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
461 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
462
463 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
465
466 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
467 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
468
469 proc.join()
470
471 def _test_fork(self, queue):
472 for i in range(10, 20):
473 queue.put(i)
474 # note that at this point the items may only be buffered, so the
475 # process cannot shutdown until the feeder thread has finished
476 # pushing items onto the pipe.
477
478 def test_fork(self):
479 # Old versions of Queue would fail to create a new feeder
480 # thread for a forked process if the original process had its
481 # own feeder thread. This test checks that this no longer
482 # happens.
483
484 queue = self.Queue()
485
486 # put items on queue so that main process starts a feeder thread
487 for i in range(10):
488 queue.put(i)
489
490 # wait to make sure thread starts before we fork a new process
491 time.sleep(DELTA)
492
493 # fork process
494 p = self.Process(target=self._test_fork, args=(queue,))
495 p.start()
496
497 # check that all expected items are in the queue
498 for i in range(20):
499 self.assertEqual(queue.get(), i)
500 self.assertRaises(Queue.Empty, queue.get, False)
501
502 p.join()
503
504 def test_qsize(self):
505 q = self.Queue()
506 try:
507 self.assertEqual(q.qsize(), 0)
508 except NotImplementedError:
509 return
510 q.put(1)
511 self.assertEqual(q.qsize(), 1)
512 q.put(5)
513 self.assertEqual(q.qsize(), 2)
514 q.get()
515 self.assertEqual(q.qsize(), 1)
516 q.get()
517 self.assertEqual(q.qsize(), 0)
518
519 def _test_task_done(self, q):
520 for obj in iter(q.get, None):
521 time.sleep(DELTA)
522 q.task_done()
523
524 def test_task_done(self):
525 queue = self.JoinableQueue()
526
527 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000528 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000529
530 workers = [self.Process(target=self._test_task_done, args=(queue,))
531 for i in xrange(4)]
532
533 for p in workers:
534 p.start()
535
536 for i in xrange(10):
537 queue.put(i)
538
539 queue.join()
540
541 for p in workers:
542 queue.put(None)
543
544 for p in workers:
545 p.join()
546
547#
548#
549#
550
551class _TestLock(BaseTestCase):
552
553 def test_lock(self):
554 lock = self.Lock()
555 self.assertEqual(lock.acquire(), True)
556 self.assertEqual(lock.acquire(False), False)
557 self.assertEqual(lock.release(), None)
558 self.assertRaises((ValueError, threading.ThreadError), lock.release)
559
560 def test_rlock(self):
561 lock = self.RLock()
562 self.assertEqual(lock.acquire(), True)
563 self.assertEqual(lock.acquire(), True)
564 self.assertEqual(lock.acquire(), True)
565 self.assertEqual(lock.release(), None)
566 self.assertEqual(lock.release(), None)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((AssertionError, RuntimeError), lock.release)
569
Jesse Noller82eb5902009-03-30 23:29:31 +0000570 def test_lock_context(self):
571 with self.Lock():
572 pass
573
Benjamin Petersondfd79492008-06-13 19:13:39 +0000574
575class _TestSemaphore(BaseTestCase):
576
577 def _test_semaphore(self, sem):
578 self.assertReturnsIfImplemented(2, get_value, sem)
579 self.assertEqual(sem.acquire(), True)
580 self.assertReturnsIfImplemented(1, get_value, sem)
581 self.assertEqual(sem.acquire(), True)
582 self.assertReturnsIfImplemented(0, get_value, sem)
583 self.assertEqual(sem.acquire(False), False)
584 self.assertReturnsIfImplemented(0, get_value, sem)
585 self.assertEqual(sem.release(), None)
586 self.assertReturnsIfImplemented(1, get_value, sem)
587 self.assertEqual(sem.release(), None)
588 self.assertReturnsIfImplemented(2, get_value, sem)
589
590 def test_semaphore(self):
591 sem = self.Semaphore(2)
592 self._test_semaphore(sem)
593 self.assertEqual(sem.release(), None)
594 self.assertReturnsIfImplemented(3, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(4, get_value, sem)
597
598 def test_bounded_semaphore(self):
599 sem = self.BoundedSemaphore(2)
600 self._test_semaphore(sem)
601 # Currently fails on OS/X
602 #if HAVE_GETVALUE:
603 # self.assertRaises(ValueError, sem.release)
604 # self.assertReturnsIfImplemented(2, get_value, sem)
605
606 def test_timeout(self):
607 if self.TYPE != 'processes':
608 return
609
610 sem = self.Semaphore(0)
611 acquire = TimingWrapper(sem.acquire)
612
613 self.assertEqual(acquire(False), False)
614 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
615
616 self.assertEqual(acquire(False, None), False)
617 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
618
619 self.assertEqual(acquire(False, TIMEOUT1), False)
620 self.assertTimingAlmostEqual(acquire.elapsed, 0)
621
622 self.assertEqual(acquire(True, TIMEOUT2), False)
623 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
624
625 self.assertEqual(acquire(timeout=TIMEOUT3), False)
626 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
627
628
629class _TestCondition(BaseTestCase):
630
631 def f(self, cond, sleeping, woken, timeout=None):
632 cond.acquire()
633 sleeping.release()
634 cond.wait(timeout)
635 woken.release()
636 cond.release()
637
638 def check_invariant(self, cond):
639 # this is only supposed to succeed when there are no sleepers
640 if self.TYPE == 'processes':
641 try:
642 sleepers = (cond._sleeping_count.get_value() -
643 cond._woken_count.get_value())
644 self.assertEqual(sleepers, 0)
645 self.assertEqual(cond._wait_semaphore.get_value(), 0)
646 except NotImplementedError:
647 pass
648
649 def test_notify(self):
650 cond = self.Condition()
651 sleeping = self.Semaphore(0)
652 woken = self.Semaphore(0)
653
654 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000655 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000656 p.start()
657
658 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000659 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000660 p.start()
661
662 # wait for both children to start sleeping
663 sleeping.acquire()
664 sleeping.acquire()
665
666 # check no process/thread has woken up
667 time.sleep(DELTA)
668 self.assertReturnsIfImplemented(0, get_value, woken)
669
670 # wake up one process/thread
671 cond.acquire()
672 cond.notify()
673 cond.release()
674
675 # check one process/thread has woken up
676 time.sleep(DELTA)
677 self.assertReturnsIfImplemented(1, get_value, woken)
678
679 # wake up another
680 cond.acquire()
681 cond.notify()
682 cond.release()
683
684 # check other has woken up
685 time.sleep(DELTA)
686 self.assertReturnsIfImplemented(2, get_value, woken)
687
688 # check state is not mucked up
689 self.check_invariant(cond)
690 p.join()
691
692 def test_notify_all(self):
693 cond = self.Condition()
694 sleeping = self.Semaphore(0)
695 woken = self.Semaphore(0)
696
697 # start some threads/processes which will timeout
698 for i in range(3):
699 p = self.Process(target=self.f,
700 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000701 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000702 p.start()
703
704 t = threading.Thread(target=self.f,
705 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000706 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000707 t.start()
708
709 # wait for them all to sleep
710 for i in xrange(6):
711 sleeping.acquire()
712
713 # check they have all timed out
714 for i in xrange(6):
715 woken.acquire()
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # check state is not mucked up
719 self.check_invariant(cond)
720
721 # start some more threads/processes
722 for i in range(3):
723 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000724 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000725 p.start()
726
727 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000728 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000729 t.start()
730
731 # wait for them to all sleep
732 for i in xrange(6):
733 sleeping.acquire()
734
735 # check no process/thread has woken up
736 time.sleep(DELTA)
737 self.assertReturnsIfImplemented(0, get_value, woken)
738
739 # wake them all up
740 cond.acquire()
741 cond.notify_all()
742 cond.release()
743
744 # check they have all woken
745 time.sleep(DELTA)
746 self.assertReturnsIfImplemented(6, get_value, woken)
747
748 # check state is not mucked up
749 self.check_invariant(cond)
750
751 def test_timeout(self):
752 cond = self.Condition()
753 wait = TimingWrapper(cond.wait)
754 cond.acquire()
755 res = wait(TIMEOUT1)
756 cond.release()
757 self.assertEqual(res, None)
758 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
759
760
761class _TestEvent(BaseTestCase):
762
763 def _test_event(self, event):
764 time.sleep(TIMEOUT2)
765 event.set()
766
767 def test_event(self):
768 event = self.Event()
769 wait = TimingWrapper(event.wait)
770
771 # Removed temporaily, due to API shear, this does not
772 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000773 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000774
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000775 # Removed, threading.Event.wait() will return the value of the __flag
776 # instead of None. API Shear with the semaphore backed mp.Event
777 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000778 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000779 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000780 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
781
782 event.set()
783
784 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000785 self.assertEqual(event.is_set(), True)
786 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000787 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000788 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000789 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
790 # self.assertEqual(event.is_set(), True)
791
792 event.clear()
793
794 #self.assertEqual(event.is_set(), False)
795
796 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000797 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000798
799#
800#
801#
802
803class _TestValue(BaseTestCase):
804
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000805 ALLOWED_TYPES = ('processes',)
806
Benjamin Petersondfd79492008-06-13 19:13:39 +0000807 codes_values = [
808 ('i', 4343, 24234),
809 ('d', 3.625, -4.25),
810 ('h', -232, 234),
811 ('c', latin('x'), latin('y'))
812 ]
813
814 def _test(self, values):
815 for sv, cv in zip(values, self.codes_values):
816 sv.value = cv[2]
817
818
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000819 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000820 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000821 if raw:
822 values = [self.RawValue(code, value)
823 for code, value, _ in self.codes_values]
824 else:
825 values = [self.Value(code, value)
826 for code, value, _ in self.codes_values]
827
828 for sv, cv in zip(values, self.codes_values):
829 self.assertEqual(sv.value, cv[1])
830
831 proc = self.Process(target=self._test, args=(values,))
832 proc.start()
833 proc.join()
834
835 for sv, cv in zip(values, self.codes_values):
836 self.assertEqual(sv.value, cv[2])
837
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000838 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000839 def test_rawvalue(self):
840 self.test_value(raw=True)
841
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000842 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000843 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000844 val1 = self.Value('i', 5)
845 lock1 = val1.get_lock()
846 obj1 = val1.get_obj()
847
848 val2 = self.Value('i', 5, lock=None)
849 lock2 = val2.get_lock()
850 obj2 = val2.get_obj()
851
852 lock = self.Lock()
853 val3 = self.Value('i', 5, lock=lock)
854 lock3 = val3.get_lock()
855 obj3 = val3.get_obj()
856 self.assertEqual(lock, lock3)
857
Jesse Noller6ab22152009-01-18 02:45:38 +0000858 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000859 self.assertFalse(hasattr(arr4, 'get_lock'))
860 self.assertFalse(hasattr(arr4, 'get_obj'))
861
Jesse Noller6ab22152009-01-18 02:45:38 +0000862 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
863
864 arr5 = self.RawValue('i', 5)
865 self.assertFalse(hasattr(arr5, 'get_lock'))
866 self.assertFalse(hasattr(arr5, 'get_obj'))
867
Benjamin Petersondfd79492008-06-13 19:13:39 +0000868
869class _TestArray(BaseTestCase):
870
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000871 ALLOWED_TYPES = ('processes',)
872
Benjamin Petersondfd79492008-06-13 19:13:39 +0000873 def f(self, seq):
874 for i in range(1, len(seq)):
875 seq[i] += seq[i-1]
876
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000877 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000878 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000879 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
880 if raw:
881 arr = self.RawArray('i', seq)
882 else:
883 arr = self.Array('i', seq)
884
885 self.assertEqual(len(arr), len(seq))
886 self.assertEqual(arr[3], seq[3])
887 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
888
889 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
890
891 self.assertEqual(list(arr[:]), seq)
892
893 self.f(seq)
894
895 p = self.Process(target=self.f, args=(arr,))
896 p.start()
897 p.join()
898
899 self.assertEqual(list(arr[:]), seq)
900
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000901 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000902 def test_rawarray(self):
903 self.test_array(raw=True)
904
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000905 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000906 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 arr1 = self.Array('i', range(10))
908 lock1 = arr1.get_lock()
909 obj1 = arr1.get_obj()
910
911 arr2 = self.Array('i', range(10), lock=None)
912 lock2 = arr2.get_lock()
913 obj2 = arr2.get_obj()
914
915 lock = self.Lock()
916 arr3 = self.Array('i', range(10), lock=lock)
917 lock3 = arr3.get_lock()
918 obj3 = arr3.get_obj()
919 self.assertEqual(lock, lock3)
920
Jesse Noller6ab22152009-01-18 02:45:38 +0000921 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000922 self.assertFalse(hasattr(arr4, 'get_lock'))
923 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000924 self.assertRaises(AttributeError,
925 self.Array, 'i', range(10), lock='notalock')
926
927 arr5 = self.RawArray('i', range(10))
928 self.assertFalse(hasattr(arr5, 'get_lock'))
929 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000930
931#
932#
933#
934
935class _TestContainers(BaseTestCase):
936
937 ALLOWED_TYPES = ('manager',)
938
939 def test_list(self):
940 a = self.list(range(10))
941 self.assertEqual(a[:], range(10))
942
943 b = self.list()
944 self.assertEqual(b[:], [])
945
946 b.extend(range(5))
947 self.assertEqual(b[:], range(5))
948
949 self.assertEqual(b[2], 2)
950 self.assertEqual(b[2:10], [2,3,4])
951
952 b *= 2
953 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
954
955 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
956
957 self.assertEqual(a[:], range(10))
958
959 d = [a, b]
960 e = self.list(d)
961 self.assertEqual(
962 e[:],
963 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
964 )
965
966 f = self.list([a])
967 a.append('hello')
968 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
969
970 def test_dict(self):
971 d = self.dict()
972 indices = range(65, 70)
973 for i in indices:
974 d[i] = chr(i)
975 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
976 self.assertEqual(sorted(d.keys()), indices)
977 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
978 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
979
980 def test_namespace(self):
981 n = self.Namespace()
982 n.name = 'Bob'
983 n.job = 'Builder'
984 n._hidden = 'hidden'
985 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
986 del n.job
987 self.assertEqual(str(n), "Namespace(name='Bob')")
988 self.assertTrue(hasattr(n, 'name'))
989 self.assertTrue(not hasattr(n, 'job'))
990
991#
992#
993#
994
995def sqr(x, wait=0.0):
996 time.sleep(wait)
997 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000998class _TestPool(BaseTestCase):
999
1000 def test_apply(self):
1001 papply = self.pool.apply
1002 self.assertEqual(papply(sqr, (5,)), sqr(5))
1003 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1004
1005 def test_map(self):
1006 pmap = self.pool.map
1007 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1008 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1009 map(sqr, range(100)))
1010
Jesse Noller7530e472009-07-16 14:23:04 +00001011 def test_map_chunksize(self):
1012 try:
1013 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1014 except multiprocessing.TimeoutError:
1015 self.fail("pool.map_async with chunksize stalled on null list")
1016
Benjamin Petersondfd79492008-06-13 19:13:39 +00001017 def test_async(self):
1018 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1019 get = TimingWrapper(res.get)
1020 self.assertEqual(get(), 49)
1021 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1022
1023 def test_async_timeout(self):
1024 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1025 get = TimingWrapper(res.get)
1026 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1027 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1028
1029 def test_imap(self):
1030 it = self.pool.imap(sqr, range(10))
1031 self.assertEqual(list(it), map(sqr, range(10)))
1032
1033 it = self.pool.imap(sqr, range(10))
1034 for i in range(10):
1035 self.assertEqual(it.next(), i*i)
1036 self.assertRaises(StopIteration, it.next)
1037
1038 it = self.pool.imap(sqr, range(1000), chunksize=100)
1039 for i in range(1000):
1040 self.assertEqual(it.next(), i*i)
1041 self.assertRaises(StopIteration, it.next)
1042
1043 def test_imap_unordered(self):
1044 it = self.pool.imap_unordered(sqr, range(1000))
1045 self.assertEqual(sorted(it), map(sqr, range(1000)))
1046
1047 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1048 self.assertEqual(sorted(it), map(sqr, range(1000)))
1049
1050 def test_make_pool(self):
1051 p = multiprocessing.Pool(3)
1052 self.assertEqual(3, len(p._pool))
1053 p.close()
1054 p.join()
1055
1056 def test_terminate(self):
1057 if self.TYPE == 'manager':
1058 # On Unix a forked process increfs each shared object to
1059 # which its parent process held a reference. If the
1060 # forked process gets terminated then there is likely to
1061 # be a reference leak. So to prevent
1062 # _TestZZZNumberOfObjects from failing we skip this test
1063 # when using a manager.
1064 return
1065
1066 result = self.pool.map_async(
1067 time.sleep, [0.1 for i in range(10000)], chunksize=1
1068 )
1069 self.pool.terminate()
1070 join = TimingWrapper(self.pool.join)
1071 join()
1072 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001073
1074class _TestPoolWorkerLifetime(BaseTestCase):
1075
1076 ALLOWED_TYPES = ('processes', )
1077 def test_pool_worker_lifetime(self):
1078 p = multiprocessing.Pool(3, maxtasksperchild=10)
1079 self.assertEqual(3, len(p._pool))
1080 origworkerpids = [w.pid for w in p._pool]
1081 # Run many tasks so each worker gets replaced (hopefully)
1082 results = []
1083 for i in range(100):
1084 results.append(p.apply_async(sqr, (i, )))
1085 # Fetch the results and verify we got the right answers,
1086 # also ensuring all the tasks have completed.
1087 for (j, res) in enumerate(results):
1088 self.assertEqual(res.get(), sqr(j))
1089 # Refill the pool
1090 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001091 # Wait until all workers are alive
1092 countdown = 5
1093 while countdown and not all(w.is_alive() for w in p._pool):
1094 countdown -= 1
1095 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001096 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001097 # All pids should be assigned. See issue #7805.
1098 self.assertNotIn(None, origworkerpids)
1099 self.assertNotIn(None, finalworkerpids)
1100 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001101 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1102 p.close()
1103 p.join()
1104
Benjamin Petersondfd79492008-06-13 19:13:39 +00001105#
1106# Test that manager has expected number of shared objects left
1107#
1108
1109class _TestZZZNumberOfObjects(BaseTestCase):
1110 # Because test cases are sorted alphabetically, this one will get
1111 # run after all the other tests for the manager. It tests that
1112 # there have been no "reference leaks" for the manager's shared
1113 # objects. Note the comment in _TestPool.test_terminate().
1114 ALLOWED_TYPES = ('manager',)
1115
1116 def test_number_of_objects(self):
1117 EXPECTED_NUMBER = 1 # the pool object is still alive
1118 multiprocessing.active_children() # discard dead process objs
1119 gc.collect() # do garbage collection
1120 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001121 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001122 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001123 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001124 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001125
1126 self.assertEqual(refs, EXPECTED_NUMBER)
1127
1128#
1129# Test of creating a customized manager class
1130#
1131
1132from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1133
1134class FooBar(object):
1135 def f(self):
1136 return 'f()'
1137 def g(self):
1138 raise ValueError
1139 def _h(self):
1140 return '_h()'
1141
1142def baz():
1143 for i in xrange(10):
1144 yield i*i
1145
1146class IteratorProxy(BaseProxy):
1147 _exposed_ = ('next', '__next__')
1148 def __iter__(self):
1149 return self
1150 def next(self):
1151 return self._callmethod('next')
1152 def __next__(self):
1153 return self._callmethod('__next__')
1154
1155class MyManager(BaseManager):
1156 pass
1157
1158MyManager.register('Foo', callable=FooBar)
1159MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1160MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1161
1162
1163class _TestMyManager(BaseTestCase):
1164
1165 ALLOWED_TYPES = ('manager',)
1166
1167 def test_mymanager(self):
1168 manager = MyManager()
1169 manager.start()
1170
1171 foo = manager.Foo()
1172 bar = manager.Bar()
1173 baz = manager.baz()
1174
1175 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1176 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1177
1178 self.assertEqual(foo_methods, ['f', 'g'])
1179 self.assertEqual(bar_methods, ['f', '_h'])
1180
1181 self.assertEqual(foo.f(), 'f()')
1182 self.assertRaises(ValueError, foo.g)
1183 self.assertEqual(foo._callmethod('f'), 'f()')
1184 self.assertRaises(RemoteError, foo._callmethod, '_h')
1185
1186 self.assertEqual(bar.f(), 'f()')
1187 self.assertEqual(bar._h(), '_h()')
1188 self.assertEqual(bar._callmethod('f'), 'f()')
1189 self.assertEqual(bar._callmethod('_h'), '_h()')
1190
1191 self.assertEqual(list(baz), [i*i for i in range(10)])
1192
1193 manager.shutdown()
1194
1195#
1196# Test of connecting to a remote server and using xmlrpclib for serialization
1197#
1198
1199_queue = Queue.Queue()
1200def get_queue():
1201 return _queue
1202
1203class QueueManager(BaseManager):
1204 '''manager class used by server process'''
1205QueueManager.register('get_queue', callable=get_queue)
1206
1207class QueueManager2(BaseManager):
1208 '''manager class which specifies the same interface as QueueManager'''
1209QueueManager2.register('get_queue')
1210
1211
1212SERIALIZER = 'xmlrpclib'
1213
1214class _TestRemoteManager(BaseTestCase):
1215
1216 ALLOWED_TYPES = ('manager',)
1217
1218 def _putter(self, address, authkey):
1219 manager = QueueManager2(
1220 address=address, authkey=authkey, serializer=SERIALIZER
1221 )
1222 manager.connect()
1223 queue = manager.get_queue()
1224 queue.put(('hello world', None, True, 2.25))
1225
1226 def test_remote(self):
1227 authkey = os.urandom(32)
1228
1229 manager = QueueManager(
1230 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1231 )
1232 manager.start()
1233
1234 p = self.Process(target=self._putter, args=(manager.address, authkey))
1235 p.start()
1236
1237 manager2 = QueueManager2(
1238 address=manager.address, authkey=authkey, serializer=SERIALIZER
1239 )
1240 manager2.connect()
1241 queue = manager2.get_queue()
1242
1243 # Note that xmlrpclib will deserialize object as a list not a tuple
1244 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1245
1246 # Because we are using xmlrpclib for serialization instead of
1247 # pickle this will cause a serialization error.
1248 self.assertRaises(Exception, queue.put, time.sleep)
1249
1250 # Make queue finalizer run before the server is stopped
1251 del queue
1252 manager.shutdown()
1253
Jesse Noller459a6482009-03-30 15:50:42 +00001254class _TestManagerRestart(BaseTestCase):
1255
1256 def _putter(self, address, authkey):
1257 manager = QueueManager(
1258 address=address, authkey=authkey, serializer=SERIALIZER)
1259 manager.connect()
1260 queue = manager.get_queue()
1261 queue.put('hello world')
1262
1263 def test_rapid_restart(self):
1264 authkey = os.urandom(32)
1265 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001266 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1267 addr = manager.get_server().address
Jesse Noller459a6482009-03-30 15:50:42 +00001268 manager.start()
1269
1270 p = self.Process(target=self._putter, args=(manager.address, authkey))
1271 p.start()
1272 queue = manager.get_queue()
1273 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001274 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001275 manager.shutdown()
1276 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001277 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001278 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001279 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001280
Benjamin Petersondfd79492008-06-13 19:13:39 +00001281#
1282#
1283#
1284
1285SENTINEL = latin('')
1286
1287class _TestConnection(BaseTestCase):
1288
1289 ALLOWED_TYPES = ('processes', 'threads')
1290
1291 def _echo(self, conn):
1292 for msg in iter(conn.recv_bytes, SENTINEL):
1293 conn.send_bytes(msg)
1294 conn.close()
1295
1296 def test_connection(self):
1297 conn, child_conn = self.Pipe()
1298
1299 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001300 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001301 p.start()
1302
1303 seq = [1, 2.25, None]
1304 msg = latin('hello world')
1305 longmsg = msg * 10
1306 arr = array.array('i', range(4))
1307
1308 if self.TYPE == 'processes':
1309 self.assertEqual(type(conn.fileno()), int)
1310
1311 self.assertEqual(conn.send(seq), None)
1312 self.assertEqual(conn.recv(), seq)
1313
1314 self.assertEqual(conn.send_bytes(msg), None)
1315 self.assertEqual(conn.recv_bytes(), msg)
1316
1317 if self.TYPE == 'processes':
1318 buffer = array.array('i', [0]*10)
1319 expected = list(arr) + [0] * (10 - len(arr))
1320 self.assertEqual(conn.send_bytes(arr), None)
1321 self.assertEqual(conn.recv_bytes_into(buffer),
1322 len(arr) * buffer.itemsize)
1323 self.assertEqual(list(buffer), expected)
1324
1325 buffer = array.array('i', [0]*10)
1326 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1327 self.assertEqual(conn.send_bytes(arr), None)
1328 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1329 len(arr) * buffer.itemsize)
1330 self.assertEqual(list(buffer), expected)
1331
1332 buffer = bytearray(latin(' ' * 40))
1333 self.assertEqual(conn.send_bytes(longmsg), None)
1334 try:
1335 res = conn.recv_bytes_into(buffer)
1336 except multiprocessing.BufferTooShort, e:
1337 self.assertEqual(e.args, (longmsg,))
1338 else:
1339 self.fail('expected BufferTooShort, got %s' % res)
1340
1341 poll = TimingWrapper(conn.poll)
1342
1343 self.assertEqual(poll(), False)
1344 self.assertTimingAlmostEqual(poll.elapsed, 0)
1345
1346 self.assertEqual(poll(TIMEOUT1), False)
1347 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1348
1349 conn.send(None)
1350
1351 self.assertEqual(poll(TIMEOUT1), True)
1352 self.assertTimingAlmostEqual(poll.elapsed, 0)
1353
1354 self.assertEqual(conn.recv(), None)
1355
1356 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1357 conn.send_bytes(really_big_msg)
1358 self.assertEqual(conn.recv_bytes(), really_big_msg)
1359
1360 conn.send_bytes(SENTINEL) # tell child to quit
1361 child_conn.close()
1362
1363 if self.TYPE == 'processes':
1364 self.assertEqual(conn.readable, True)
1365 self.assertEqual(conn.writable, True)
1366 self.assertRaises(EOFError, conn.recv)
1367 self.assertRaises(EOFError, conn.recv_bytes)
1368
1369 p.join()
1370
1371 def test_duplex_false(self):
1372 reader, writer = self.Pipe(duplex=False)
1373 self.assertEqual(writer.send(1), None)
1374 self.assertEqual(reader.recv(), 1)
1375 if self.TYPE == 'processes':
1376 self.assertEqual(reader.readable, True)
1377 self.assertEqual(reader.writable, False)
1378 self.assertEqual(writer.readable, False)
1379 self.assertEqual(writer.writable, True)
1380 self.assertRaises(IOError, reader.send, 2)
1381 self.assertRaises(IOError, writer.recv)
1382 self.assertRaises(IOError, writer.poll)
1383
1384 def test_spawn_close(self):
1385 # We test that a pipe connection can be closed by parent
1386 # process immediately after child is spawned. On Windows this
1387 # would have sometimes failed on old versions because
1388 # child_conn would be closed before the child got a chance to
1389 # duplicate it.
1390 conn, child_conn = self.Pipe()
1391
1392 p = self.Process(target=self._echo, args=(child_conn,))
1393 p.start()
1394 child_conn.close() # this might complete before child initializes
1395
1396 msg = latin('hello')
1397 conn.send_bytes(msg)
1398 self.assertEqual(conn.recv_bytes(), msg)
1399
1400 conn.send_bytes(SENTINEL)
1401 conn.close()
1402 p.join()
1403
1404 def test_sendbytes(self):
1405 if self.TYPE != 'processes':
1406 return
1407
1408 msg = latin('abcdefghijklmnopqrstuvwxyz')
1409 a, b = self.Pipe()
1410
1411 a.send_bytes(msg)
1412 self.assertEqual(b.recv_bytes(), msg)
1413
1414 a.send_bytes(msg, 5)
1415 self.assertEqual(b.recv_bytes(), msg[5:])
1416
1417 a.send_bytes(msg, 7, 8)
1418 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1419
1420 a.send_bytes(msg, 26)
1421 self.assertEqual(b.recv_bytes(), latin(''))
1422
1423 a.send_bytes(msg, 26, 0)
1424 self.assertEqual(b.recv_bytes(), latin(''))
1425
1426 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1427
1428 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1429
1430 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1431
1432 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1433
1434 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1435
Benjamin Petersondfd79492008-06-13 19:13:39 +00001436class _TestListenerClient(BaseTestCase):
1437
1438 ALLOWED_TYPES = ('processes', 'threads')
1439
1440 def _test(self, address):
1441 conn = self.connection.Client(address)
1442 conn.send('hello')
1443 conn.close()
1444
1445 def test_listener_client(self):
1446 for family in self.connection.families:
1447 l = self.connection.Listener(family=family)
1448 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001449 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001450 p.start()
1451 conn = l.accept()
1452 self.assertEqual(conn.recv(), 'hello')
1453 p.join()
1454 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001455#
1456# Test of sending connection and socket objects between processes
1457#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001458"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001459class _TestPicklingConnections(BaseTestCase):
1460
1461 ALLOWED_TYPES = ('processes',)
1462
1463 def _listener(self, conn, families):
1464 for fam in families:
1465 l = self.connection.Listener(family=fam)
1466 conn.send(l.address)
1467 new_conn = l.accept()
1468 conn.send(new_conn)
1469
1470 if self.TYPE == 'processes':
1471 l = socket.socket()
1472 l.bind(('localhost', 0))
1473 conn.send(l.getsockname())
1474 l.listen(1)
1475 new_conn, addr = l.accept()
1476 conn.send(new_conn)
1477
1478 conn.recv()
1479
1480 def _remote(self, conn):
1481 for (address, msg) in iter(conn.recv, None):
1482 client = self.connection.Client(address)
1483 client.send(msg.upper())
1484 client.close()
1485
1486 if self.TYPE == 'processes':
1487 address, msg = conn.recv()
1488 client = socket.socket()
1489 client.connect(address)
1490 client.sendall(msg.upper())
1491 client.close()
1492
1493 conn.close()
1494
1495 def test_pickling(self):
1496 try:
1497 multiprocessing.allow_connection_pickling()
1498 except ImportError:
1499 return
1500
1501 families = self.connection.families
1502
1503 lconn, lconn0 = self.Pipe()
1504 lp = self.Process(target=self._listener, args=(lconn0, families))
1505 lp.start()
1506 lconn0.close()
1507
1508 rconn, rconn0 = self.Pipe()
1509 rp = self.Process(target=self._remote, args=(rconn0,))
1510 rp.start()
1511 rconn0.close()
1512
1513 for fam in families:
1514 msg = ('This connection uses family %s' % fam).encode('ascii')
1515 address = lconn.recv()
1516 rconn.send((address, msg))
1517 new_conn = lconn.recv()
1518 self.assertEqual(new_conn.recv(), msg.upper())
1519
1520 rconn.send(None)
1521
1522 if self.TYPE == 'processes':
1523 msg = latin('This connection uses a normal socket')
1524 address = lconn.recv()
1525 rconn.send((address, msg))
1526 if hasattr(socket, 'fromfd'):
1527 new_conn = lconn.recv()
1528 self.assertEqual(new_conn.recv(100), msg.upper())
1529 else:
1530 # XXX On Windows with Py2.6 need to backport fromfd()
1531 discard = lconn.recv_bytes()
1532
1533 lconn.send(None)
1534
1535 rconn.close()
1536 lconn.close()
1537
1538 lp.join()
1539 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001540"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001541#
1542#
1543#
1544
1545class _TestHeap(BaseTestCase):
1546
1547 ALLOWED_TYPES = ('processes',)
1548
1549 def test_heap(self):
1550 iterations = 5000
1551 maxblocks = 50
1552 blocks = []
1553
1554 # create and destroy lots of blocks of different sizes
1555 for i in xrange(iterations):
1556 size = int(random.lognormvariate(0, 1) * 1000)
1557 b = multiprocessing.heap.BufferWrapper(size)
1558 blocks.append(b)
1559 if len(blocks) > maxblocks:
1560 i = random.randrange(maxblocks)
1561 del blocks[i]
1562
1563 # get the heap object
1564 heap = multiprocessing.heap.BufferWrapper._heap
1565
1566 # verify the state of the heap
1567 all = []
1568 occupied = 0
1569 for L in heap._len_to_seq.values():
1570 for arena, start, stop in L:
1571 all.append((heap._arenas.index(arena), start, stop,
1572 stop-start, 'free'))
1573 for arena, start, stop in heap._allocated_blocks:
1574 all.append((heap._arenas.index(arena), start, stop,
1575 stop-start, 'occupied'))
1576 occupied += (stop-start)
1577
1578 all.sort()
1579
1580 for i in range(len(all)-1):
1581 (arena, start, stop) = all[i][:3]
1582 (narena, nstart, nstop) = all[i+1][:3]
1583 self.assertTrue((arena != narena and nstart == 0) or
1584 (stop == nstart))
1585
1586#
1587#
1588#
1589
Benjamin Petersondfd79492008-06-13 19:13:39 +00001590class _Foo(Structure):
1591 _fields_ = [
1592 ('x', c_int),
1593 ('y', c_double)
1594 ]
1595
1596class _TestSharedCTypes(BaseTestCase):
1597
1598 ALLOWED_TYPES = ('processes',)
1599
1600 def _double(self, x, y, foo, arr, string):
1601 x.value *= 2
1602 y.value *= 2
1603 foo.x *= 2
1604 foo.y *= 2
1605 string.value *= 2
1606 for i in range(len(arr)):
1607 arr[i] *= 2
1608
Nick Coghlan13623662010-04-10 14:24:36 +00001609 @unittest.skipIf(Value is None, "requires ctypes.Value")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001610 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001611 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001612 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001613 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001614 arr = self.Array('d', range(10), lock=lock)
1615 string = self.Array('c', 20, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001616 string.value = 'hello'
1617
1618 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1619 p.start()
1620 p.join()
1621
1622 self.assertEqual(x.value, 14)
1623 self.assertAlmostEqual(y.value, 2.0/3.0)
1624 self.assertEqual(foo.x, 6)
1625 self.assertAlmostEqual(foo.y, 4.0)
1626 for i in range(10):
1627 self.assertAlmostEqual(arr[i], i*2)
1628 self.assertEqual(string.value, latin('hellohello'))
1629
Nick Coghlan13623662010-04-10 14:24:36 +00001630 @unittest.skipIf(Value is None, "requires ctypes.Value")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001631 def test_synchronize(self):
1632 self.test_sharedctypes(lock=True)
1633
Nick Coghlan13623662010-04-10 14:24:36 +00001634 @unittest.skipIf(ctypes_copy is None, "requires ctypes.copy")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001635 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001636 foo = _Foo(2, 5.0)
Nick Coghlan13623662010-04-10 14:24:36 +00001637 bar = ctypes_copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001638 foo.x = 0
1639 foo.y = 0
1640 self.assertEqual(bar.x, 2)
1641 self.assertAlmostEqual(bar.y, 5.0)
1642
1643#
1644#
1645#
1646
1647class _TestFinalize(BaseTestCase):
1648
1649 ALLOWED_TYPES = ('processes',)
1650
1651 def _test_finalize(self, conn):
1652 class Foo(object):
1653 pass
1654
1655 a = Foo()
1656 util.Finalize(a, conn.send, args=('a',))
1657 del a # triggers callback for a
1658
1659 b = Foo()
1660 close_b = util.Finalize(b, conn.send, args=('b',))
1661 close_b() # triggers callback for b
1662 close_b() # does nothing because callback has already been called
1663 del b # does nothing because callback has already been called
1664
1665 c = Foo()
1666 util.Finalize(c, conn.send, args=('c',))
1667
1668 d10 = Foo()
1669 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1670
1671 d01 = Foo()
1672 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1673 d02 = Foo()
1674 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1675 d03 = Foo()
1676 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1677
1678 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1679
1680 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1681
1682 # call mutliprocessing's cleanup function then exit process without
1683 # garbage collecting locals
1684 util._exit_function()
1685 conn.close()
1686 os._exit(0)
1687
1688 def test_finalize(self):
1689 conn, child_conn = self.Pipe()
1690
1691 p = self.Process(target=self._test_finalize, args=(child_conn,))
1692 p.start()
1693 p.join()
1694
1695 result = [obj for obj in iter(conn.recv, 'STOP')]
1696 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1697
1698#
1699# Test that from ... import * works for each module
1700#
1701
1702class _TestImportStar(BaseTestCase):
1703
1704 ALLOWED_TYPES = ('processes',)
1705
1706 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001707 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001708 'multiprocessing', 'multiprocessing.connection',
1709 'multiprocessing.heap', 'multiprocessing.managers',
1710 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001711 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001712 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001713 ]
1714
1715 if c_int is not None:
1716 # This module requires _ctypes
1717 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001718
1719 for name in modules:
1720 __import__(name)
1721 mod = sys.modules[name]
1722
1723 for attr in getattr(mod, '__all__', ()):
1724 self.assertTrue(
1725 hasattr(mod, attr),
1726 '%r does not have attribute %r' % (mod, attr)
1727 )
1728
1729#
1730# Quick test that logging works -- does not test logging output
1731#
1732
1733class _TestLogging(BaseTestCase):
1734
1735 ALLOWED_TYPES = ('processes',)
1736
1737 def test_enable_logging(self):
1738 logger = multiprocessing.get_logger()
1739 logger.setLevel(util.SUBWARNING)
1740 self.assertTrue(logger is not None)
1741 logger.debug('this will not be printed')
1742 logger.info('nor will this')
1743 logger.setLevel(LOG_LEVEL)
1744
1745 def _test_level(self, conn):
1746 logger = multiprocessing.get_logger()
1747 conn.send(logger.getEffectiveLevel())
1748
1749 def test_level(self):
1750 LEVEL1 = 32
1751 LEVEL2 = 37
1752
1753 logger = multiprocessing.get_logger()
1754 root_logger = logging.getLogger()
1755 root_level = root_logger.level
1756
1757 reader, writer = multiprocessing.Pipe(duplex=False)
1758
1759 logger.setLevel(LEVEL1)
1760 self.Process(target=self._test_level, args=(writer,)).start()
1761 self.assertEqual(LEVEL1, reader.recv())
1762
1763 logger.setLevel(logging.NOTSET)
1764 root_logger.setLevel(LEVEL2)
1765 self.Process(target=self._test_level, args=(writer,)).start()
1766 self.assertEqual(LEVEL2, reader.recv())
1767
1768 root_logger.setLevel(root_level)
1769 logger.setLevel(level=LOG_LEVEL)
1770
Jesse Noller814d02d2009-11-21 14:38:23 +00001771
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001772# class _TestLoggingProcessName(BaseTestCase):
1773#
1774# def handle(self, record):
1775# assert record.processName == multiprocessing.current_process().name
1776# self.__handled = True
1777#
1778# def test_logging(self):
1779# handler = logging.Handler()
1780# handler.handle = self.handle
1781# self.__handled = False
1782# # Bypass getLogger() and side-effects
1783# logger = logging.getLoggerClass()(
1784# 'multiprocessing.test.TestLoggingProcessName')
1785# logger.addHandler(handler)
1786# logger.propagate = False
1787#
1788# logger.warn('foo')
1789# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001790
Benjamin Petersondfd79492008-06-13 19:13:39 +00001791#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001792# Test to verify handle verification, see issue 3321
1793#
1794
1795class TestInvalidHandle(unittest.TestCase):
1796
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001797 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001798 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001799 conn = _multiprocessing.Connection(44977608)
1800 self.assertRaises(IOError, conn.poll)
1801 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001802
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001803#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001804# Functions used to create test cases from the base ones in this module
1805#
1806
1807def get_attributes(Source, names):
1808 d = {}
1809 for name in names:
1810 obj = getattr(Source, name)
1811 if type(obj) == type(get_attributes):
1812 obj = staticmethod(obj)
1813 d[name] = obj
1814 return d
1815
1816def create_test_cases(Mixin, type):
1817 result = {}
1818 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001819 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001820
1821 for name in glob.keys():
1822 if name.startswith('_Test'):
1823 base = glob[name]
1824 if type in base.ALLOWED_TYPES:
1825 newname = 'With' + Type + name[1:]
1826 class Temp(base, unittest.TestCase, Mixin):
1827 pass
1828 result[newname] = Temp
1829 Temp.__name__ = newname
1830 Temp.__module__ = Mixin.__module__
1831 return result
1832
1833#
1834# Create test cases
1835#
1836
1837class ProcessesMixin(object):
1838 TYPE = 'processes'
1839 Process = multiprocessing.Process
1840 locals().update(get_attributes(multiprocessing, (
1841 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1842 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1843 'RawArray', 'current_process', 'active_children', 'Pipe',
1844 'connection', 'JoinableQueue'
1845 )))
1846
1847testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1848globals().update(testcases_processes)
1849
1850
1851class ManagerMixin(object):
1852 TYPE = 'manager'
1853 Process = multiprocessing.Process
1854 manager = object.__new__(multiprocessing.managers.SyncManager)
1855 locals().update(get_attributes(manager, (
1856 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1857 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1858 'Namespace', 'JoinableQueue'
1859 )))
1860
1861testcases_manager = create_test_cases(ManagerMixin, type='manager')
1862globals().update(testcases_manager)
1863
1864
1865class ThreadsMixin(object):
1866 TYPE = 'threads'
1867 Process = multiprocessing.dummy.Process
1868 locals().update(get_attributes(multiprocessing.dummy, (
1869 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1870 'Condition', 'Event', 'Value', 'Array', 'current_process',
1871 'active_children', 'Pipe', 'connection', 'dict', 'list',
1872 'Namespace', 'JoinableQueue'
1873 )))
1874
1875testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1876globals().update(testcases_threads)
1877
Neal Norwitz0c519b32008-08-25 01:50:24 +00001878class OtherTest(unittest.TestCase):
1879 # TODO: add more tests for deliver/answer challenge.
1880 def test_deliver_challenge_auth_failure(self):
1881 class _FakeConnection(object):
1882 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001883 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001884 def send_bytes(self, data):
1885 pass
1886 self.assertRaises(multiprocessing.AuthenticationError,
1887 multiprocessing.connection.deliver_challenge,
1888 _FakeConnection(), b'abc')
1889
1890 def test_answer_challenge_auth_failure(self):
1891 class _FakeConnection(object):
1892 def __init__(self):
1893 self.count = 0
1894 def recv_bytes(self, size):
1895 self.count += 1
1896 if self.count == 1:
1897 return multiprocessing.connection.CHALLENGE
1898 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001899 return b'something bogus'
1900 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001901 def send_bytes(self, data):
1902 pass
1903 self.assertRaises(multiprocessing.AuthenticationError,
1904 multiprocessing.connection.answer_challenge,
1905 _FakeConnection(), b'abc')
1906
Jesse Noller7152f6d2009-04-02 05:17:26 +00001907#
1908# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1909#
1910
1911def initializer(ns):
1912 ns.test += 1
1913
1914class TestInitializers(unittest.TestCase):
1915 def setUp(self):
1916 self.mgr = multiprocessing.Manager()
1917 self.ns = self.mgr.Namespace()
1918 self.ns.test = 0
1919
1920 def tearDown(self):
1921 self.mgr.shutdown()
1922
1923 def test_manager_initializer(self):
1924 m = multiprocessing.managers.SyncManager()
1925 self.assertRaises(TypeError, m.start, 1)
1926 m.start(initializer, (self.ns,))
1927 self.assertEqual(self.ns.test, 1)
1928 m.shutdown()
1929
1930 def test_pool_initializer(self):
1931 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1932 p = multiprocessing.Pool(1, initializer, (self.ns,))
1933 p.close()
1934 p.join()
1935 self.assertEqual(self.ns.test, 1)
1936
Jesse Noller1b90efb2009-06-30 17:11:52 +00001937#
1938# Issue 5155, 5313, 5331: Test process in processes
1939# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1940#
1941
1942def _ThisSubProcess(q):
1943 try:
1944 item = q.get(block=False)
1945 except Queue.Empty:
1946 pass
1947
1948def _TestProcess(q):
1949 queue = multiprocessing.Queue()
1950 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1951 subProc.start()
1952 subProc.join()
1953
1954def _afunc(x):
1955 return x*x
1956
1957def pool_in_process():
1958 pool = multiprocessing.Pool(processes=4)
1959 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1960
1961class _file_like(object):
1962 def __init__(self, delegate):
1963 self._delegate = delegate
1964 self._pid = None
1965
1966 @property
1967 def cache(self):
1968 pid = os.getpid()
1969 # There are no race conditions since fork keeps only the running thread
1970 if pid != self._pid:
1971 self._pid = pid
1972 self._cache = []
1973 return self._cache
1974
1975 def write(self, data):
1976 self.cache.append(data)
1977
1978 def flush(self):
1979 self._delegate.write(''.join(self.cache))
1980 self._cache = []
1981
1982class TestStdinBadfiledescriptor(unittest.TestCase):
1983
1984 def test_queue_in_process(self):
1985 queue = multiprocessing.Queue()
1986 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1987 proc.start()
1988 proc.join()
1989
1990 def test_pool_in_process(self):
1991 p = multiprocessing.Process(target=pool_in_process)
1992 p.start()
1993 p.join()
1994
1995 def test_flushing(self):
1996 sio = StringIO()
1997 flike = _file_like(sio)
1998 flike.write('foo')
1999 proc = multiprocessing.Process(target=lambda: flike.flush())
2000 flike.flush()
2001 assert sio.getvalue() == 'foo'
2002
2003testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2004 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002005
Benjamin Petersondfd79492008-06-13 19:13:39 +00002006#
2007#
2008#
2009
2010def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002011 if sys.platform.startswith("linux"):
2012 try:
2013 lock = multiprocessing.RLock()
2014 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002015 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002016
Benjamin Petersondfd79492008-06-13 19:13:39 +00002017 if run is None:
2018 from test.test_support import run_unittest as run
2019
2020 util.get_temp_dir() # creates temp directory for use by all processes
2021
2022 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2023
Jesse Noller146b7ab2008-07-02 16:44:09 +00002024 ProcessesMixin.pool = multiprocessing.Pool(4)
2025 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2026 ManagerMixin.manager.__init__()
2027 ManagerMixin.manager.start()
2028 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002029
2030 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002031 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2032 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002033 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2034 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002035 )
2036
2037 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2038 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002039 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2040 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002041 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002042 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002043 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002044 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2045 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2046 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002047 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002048
Jesse Noller146b7ab2008-07-02 16:44:09 +00002049 ThreadsMixin.pool.terminate()
2050 ProcessesMixin.pool.terminate()
2051 ManagerMixin.pool.terminate()
2052 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002053
Jesse Noller146b7ab2008-07-02 16:44:09 +00002054 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002055
2056def main():
2057 test_main(unittest.TextTestRunner(verbosity=2).run)
2058
2059if __name__ == '__main__':
2060 main()