blob: 0e480a92e5d3e602e76e46b23c17caef48620e0a [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Antoine Pitroua1a8da82011-08-23 19:54:20 +020036from multiprocessing import util, reduction
Benjamin Petersondfd79492008-06-13 19:13:39 +000037
Brian Curtina06e9b82010-10-07 02:27:41 +000038try:
39 from multiprocessing.sharedctypes import Value, copy
40 HAS_SHAREDCTYPES = True
41except ImportError:
42 HAS_SHAREDCTYPES = False
43
Antoine Pitroua1a8da82011-08-23 19:54:20 +020044try:
45 import msvcrt
46except ImportError:
47 msvcrt = None
48
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50#
51#
52
Benjamin Petersone79edf52008-07-13 18:34:58 +000053latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000054
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56# Constants
57#
58
59LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000060#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000061
62DELTA = 0.1
63CHECK_TIMINGS = False # making true makes tests take a lot longer
64 # and can sometimes cause some non-serious
65 # failures because some calls block a bit
66 # longer than expected
67if CHECK_TIMINGS:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
69else:
70 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
71
72HAVE_GETVALUE = not getattr(_multiprocessing,
73 'HAVE_BROKEN_SEM_GETVALUE', False)
74
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000075WIN32 = (sys.platform == "win32")
76
Antoine Pitroua1a8da82011-08-23 19:54:20 +020077try:
78 MAXFD = os.sysconf("SC_OPEN_MAX")
79except:
80 MAXFD = 256
81
Benjamin Petersondfd79492008-06-13 19:13:39 +000082#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000083# Some tests require ctypes
84#
85
86try:
Nick Coghlan13623662010-04-10 14:24:36 +000087 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000088except ImportError:
89 Structure = object
90 c_int = c_double = None
91
92#
Benjamin Petersondfd79492008-06-13 19:13:39 +000093# Creates a wrapper for a function which records the time it takes to finish
94#
95
96class TimingWrapper(object):
97
98 def __init__(self, func):
99 self.func = func
100 self.elapsed = None
101
102 def __call__(self, *args, **kwds):
103 t = time.time()
104 try:
105 return self.func(*args, **kwds)
106 finally:
107 self.elapsed = time.time() - t
108
109#
110# Base class for test cases
111#
112
113class BaseTestCase(object):
114
115 ALLOWED_TYPES = ('processes', 'manager', 'threads')
116
117 def assertTimingAlmostEqual(self, a, b):
118 if CHECK_TIMINGS:
119 self.assertAlmostEqual(a, b, 1)
120
121 def assertReturnsIfImplemented(self, value, func, *args):
122 try:
123 res = func(*args)
124 except NotImplementedError:
125 pass
126 else:
127 return self.assertEqual(value, res)
128
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000129 # For the sanity of Windows users, rather than crashing or freezing in
130 # multiple ways.
131 def __reduce__(self, *args):
132 raise NotImplementedError("shouldn't try to pickle a test case")
133
134 __reduce_ex__ = __reduce__
135
Benjamin Petersondfd79492008-06-13 19:13:39 +0000136#
137# Return the value of a semaphore
138#
139
140def get_value(self):
141 try:
142 return self.get_value()
143 except AttributeError:
144 try:
145 return self._Semaphore__value
146 except AttributeError:
147 try:
148 return self._value
149 except AttributeError:
150 raise NotImplementedError
151
152#
153# Testcases
154#
155
156class _TestProcess(BaseTestCase):
157
158 ALLOWED_TYPES = ('processes', 'threads')
159
160 def test_current(self):
161 if self.TYPE == 'threads':
162 return
163
164 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000165 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166
167 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000169 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000170 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 self.assertEqual(current.ident, os.getpid())
172 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000173
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000174 @classmethod
175 def _test(cls, q, *args, **kwds):
176 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000177 q.put(args)
178 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000179 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000180 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000181 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000182 q.put(current.pid)
183
184 def test_process(self):
185 q = self.Queue(1)
186 e = self.Event()
187 args = (q, 1, 2)
188 kwargs = {'hello':23, 'bye':2.54}
189 name = 'SomeProcess'
190 p = self.Process(
191 target=self._test, args=args, kwargs=kwargs, name=name
192 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194 current = self.current_process()
195
196 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000197 self.assertEqual(p.authkey, current.authkey)
198 self.assertEqual(p.is_alive(), False)
199 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000200 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203
204 p.start()
205
Ezio Melotti2623a372010-11-21 13:34:58 +0000206 self.assertEqual(p.exitcode, None)
207 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000208 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000209
Ezio Melotti2623a372010-11-21 13:34:58 +0000210 self.assertEqual(q.get(), args[1:])
211 self.assertEqual(q.get(), kwargs)
212 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000213 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000214 self.assertEqual(q.get(), current.authkey)
215 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216
217 p.join()
218
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.exitcode, 0)
220 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000223 @classmethod
224 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225 time.sleep(1000)
226
227 def test_terminate(self):
228 if self.TYPE == 'threads':
229 return
230
231 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000232 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000233 p.start()
234
235 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000236 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000237 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.terminate()
240
241 join = TimingWrapper(p.join)
242 self.assertEqual(join(), None)
243 self.assertTimingAlmostEqual(join.elapsed, 0.0)
244
245 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247
248 p.join()
249
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000250 # XXX sometimes get p.exitcode == 0 on Windows ...
251 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 def test_cpu_count(self):
254 try:
255 cpus = multiprocessing.cpu_count()
256 except NotImplementedError:
257 cpus = 1
258 self.assertTrue(type(cpus) is int)
259 self.assertTrue(cpus >= 1)
260
261 def test_active_children(self):
262 self.assertEqual(type(self.active_children()), list)
263
264 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000265 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000266
Jesus Cea6f6016b2011-09-09 20:26:57 +0200267 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000268 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000269 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270
271 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000272 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000273
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000274 @classmethod
275 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000276 from multiprocessing import forking
277 wconn.send(id)
278 if len(id) < 2:
279 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000280 p = cls.Process(
281 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000282 )
283 p.start()
284 p.join()
285
286 def test_recursion(self):
287 rconn, wconn = self.Pipe(duplex=False)
288 self._test_recursion(wconn, [])
289
290 time.sleep(DELTA)
291 result = []
292 while rconn.poll():
293 result.append(rconn.recv())
294
295 expected = [
296 [],
297 [0],
298 [0, 0],
299 [0, 1],
300 [1],
301 [1, 0],
302 [1, 1]
303 ]
304 self.assertEqual(result, expected)
305
306#
307#
308#
309
310class _UpperCaser(multiprocessing.Process):
311
312 def __init__(self):
313 multiprocessing.Process.__init__(self)
314 self.child_conn, self.parent_conn = multiprocessing.Pipe()
315
316 def run(self):
317 self.parent_conn.close()
318 for s in iter(self.child_conn.recv, None):
319 self.child_conn.send(s.upper())
320 self.child_conn.close()
321
322 def submit(self, s):
323 assert type(s) is str
324 self.parent_conn.send(s)
325 return self.parent_conn.recv()
326
327 def stop(self):
328 self.parent_conn.send(None)
329 self.parent_conn.close()
330 self.child_conn.close()
331
332class _TestSubclassingProcess(BaseTestCase):
333
334 ALLOWED_TYPES = ('processes',)
335
336 def test_subclassing(self):
337 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200338 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000339 uppercaser.start()
340 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
341 self.assertEqual(uppercaser.submit('world'), 'WORLD')
342 uppercaser.stop()
343 uppercaser.join()
344
345#
346#
347#
348
349def queue_empty(q):
350 if hasattr(q, 'empty'):
351 return q.empty()
352 else:
353 return q.qsize() == 0
354
355def queue_full(q, maxsize):
356 if hasattr(q, 'full'):
357 return q.full()
358 else:
359 return q.qsize() == maxsize
360
361
362class _TestQueue(BaseTestCase):
363
364
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000365 @classmethod
366 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000367 child_can_start.wait()
368 for i in range(6):
369 queue.get()
370 parent_can_continue.set()
371
372 def test_put(self):
373 MAXSIZE = 6
374 queue = self.Queue(maxsize=MAXSIZE)
375 child_can_start = self.Event()
376 parent_can_continue = self.Event()
377
378 proc = self.Process(
379 target=self._test_put,
380 args=(queue, child_can_start, parent_can_continue)
381 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000382 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000383 proc.start()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 queue.put(1)
389 queue.put(2, True)
390 queue.put(3, True, None)
391 queue.put(4, False)
392 queue.put(5, False, None)
393 queue.put_nowait(6)
394
395 # the values may be in buffer but not yet in pipe so sleep a bit
396 time.sleep(DELTA)
397
398 self.assertEqual(queue_empty(queue), False)
399 self.assertEqual(queue_full(queue, MAXSIZE), True)
400
401 put = TimingWrapper(queue.put)
402 put_nowait = TimingWrapper(queue.put_nowait)
403
404 self.assertRaises(Queue.Full, put, 7, False)
405 self.assertTimingAlmostEqual(put.elapsed, 0)
406
407 self.assertRaises(Queue.Full, put, 7, False, None)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(Queue.Full, put_nowait, 7)
411 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
412
413 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
414 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
415
416 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
417 self.assertTimingAlmostEqual(put.elapsed, 0)
418
419 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
420 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
421
422 child_can_start.set()
423 parent_can_continue.wait()
424
425 self.assertEqual(queue_empty(queue), True)
426 self.assertEqual(queue_full(queue, MAXSIZE), False)
427
428 proc.join()
429
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000430 @classmethod
431 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000432 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000433 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000434 queue.put(2)
435 queue.put(3)
436 queue.put(4)
437 queue.put(5)
438 parent_can_continue.set()
439
440 def test_get(self):
441 queue = self.Queue()
442 child_can_start = self.Event()
443 parent_can_continue = self.Event()
444
445 proc = self.Process(
446 target=self._test_get,
447 args=(queue, child_can_start, parent_can_continue)
448 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000449 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000450 proc.start()
451
452 self.assertEqual(queue_empty(queue), True)
453
454 child_can_start.set()
455 parent_can_continue.wait()
456
457 time.sleep(DELTA)
458 self.assertEqual(queue_empty(queue), False)
459
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000460 # Hangs unexpectedly, remove for now
461 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000462 self.assertEqual(queue.get(True, None), 2)
463 self.assertEqual(queue.get(True), 3)
464 self.assertEqual(queue.get(timeout=1), 4)
465 self.assertEqual(queue.get_nowait(), 5)
466
467 self.assertEqual(queue_empty(queue), True)
468
469 get = TimingWrapper(queue.get)
470 get_nowait = TimingWrapper(queue.get_nowait)
471
472 self.assertRaises(Queue.Empty, get, False)
473 self.assertTimingAlmostEqual(get.elapsed, 0)
474
475 self.assertRaises(Queue.Empty, get, False, None)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(Queue.Empty, get_nowait)
479 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
480
481 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
482 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
483
484 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
485 self.assertTimingAlmostEqual(get.elapsed, 0)
486
487 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
488 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
489
490 proc.join()
491
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000492 @classmethod
493 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000494 for i in range(10, 20):
495 queue.put(i)
496 # note that at this point the items may only be buffered, so the
497 # process cannot shutdown until the feeder thread has finished
498 # pushing items onto the pipe.
499
500 def test_fork(self):
501 # Old versions of Queue would fail to create a new feeder
502 # thread for a forked process if the original process had its
503 # own feeder thread. This test checks that this no longer
504 # happens.
505
506 queue = self.Queue()
507
508 # put items on queue so that main process starts a feeder thread
509 for i in range(10):
510 queue.put(i)
511
512 # wait to make sure thread starts before we fork a new process
513 time.sleep(DELTA)
514
515 # fork process
516 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200517 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000518 p.start()
519
520 # check that all expected items are in the queue
521 for i in range(20):
522 self.assertEqual(queue.get(), i)
523 self.assertRaises(Queue.Empty, queue.get, False)
524
525 p.join()
526
527 def test_qsize(self):
528 q = self.Queue()
529 try:
530 self.assertEqual(q.qsize(), 0)
531 except NotImplementedError:
532 return
533 q.put(1)
534 self.assertEqual(q.qsize(), 1)
535 q.put(5)
536 self.assertEqual(q.qsize(), 2)
537 q.get()
538 self.assertEqual(q.qsize(), 1)
539 q.get()
540 self.assertEqual(q.qsize(), 0)
541
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000542 @classmethod
543 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000544 for obj in iter(q.get, None):
545 time.sleep(DELTA)
546 q.task_done()
547
548 def test_task_done(self):
549 queue = self.JoinableQueue()
550
551 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000552 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000553
554 workers = [self.Process(target=self._test_task_done, args=(queue,))
555 for i in xrange(4)]
556
557 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200558 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000559 p.start()
560
561 for i in xrange(10):
562 queue.put(i)
563
564 queue.join()
565
566 for p in workers:
567 queue.put(None)
568
569 for p in workers:
570 p.join()
571
572#
573#
574#
575
576class _TestLock(BaseTestCase):
577
578 def test_lock(self):
579 lock = self.Lock()
580 self.assertEqual(lock.acquire(), True)
581 self.assertEqual(lock.acquire(False), False)
582 self.assertEqual(lock.release(), None)
583 self.assertRaises((ValueError, threading.ThreadError), lock.release)
584
585 def test_rlock(self):
586 lock = self.RLock()
587 self.assertEqual(lock.acquire(), True)
588 self.assertEqual(lock.acquire(), True)
589 self.assertEqual(lock.acquire(), True)
590 self.assertEqual(lock.release(), None)
591 self.assertEqual(lock.release(), None)
592 self.assertEqual(lock.release(), None)
593 self.assertRaises((AssertionError, RuntimeError), lock.release)
594
Jesse Noller82eb5902009-03-30 23:29:31 +0000595 def test_lock_context(self):
596 with self.Lock():
597 pass
598
Benjamin Petersondfd79492008-06-13 19:13:39 +0000599
600class _TestSemaphore(BaseTestCase):
601
602 def _test_semaphore(self, sem):
603 self.assertReturnsIfImplemented(2, get_value, sem)
604 self.assertEqual(sem.acquire(), True)
605 self.assertReturnsIfImplemented(1, get_value, sem)
606 self.assertEqual(sem.acquire(), True)
607 self.assertReturnsIfImplemented(0, get_value, sem)
608 self.assertEqual(sem.acquire(False), False)
609 self.assertReturnsIfImplemented(0, get_value, sem)
610 self.assertEqual(sem.release(), None)
611 self.assertReturnsIfImplemented(1, get_value, sem)
612 self.assertEqual(sem.release(), None)
613 self.assertReturnsIfImplemented(2, get_value, sem)
614
615 def test_semaphore(self):
616 sem = self.Semaphore(2)
617 self._test_semaphore(sem)
618 self.assertEqual(sem.release(), None)
619 self.assertReturnsIfImplemented(3, get_value, sem)
620 self.assertEqual(sem.release(), None)
621 self.assertReturnsIfImplemented(4, get_value, sem)
622
623 def test_bounded_semaphore(self):
624 sem = self.BoundedSemaphore(2)
625 self._test_semaphore(sem)
626 # Currently fails on OS/X
627 #if HAVE_GETVALUE:
628 # self.assertRaises(ValueError, sem.release)
629 # self.assertReturnsIfImplemented(2, get_value, sem)
630
631 def test_timeout(self):
632 if self.TYPE != 'processes':
633 return
634
635 sem = self.Semaphore(0)
636 acquire = TimingWrapper(sem.acquire)
637
638 self.assertEqual(acquire(False), False)
639 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
640
641 self.assertEqual(acquire(False, None), False)
642 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
643
644 self.assertEqual(acquire(False, TIMEOUT1), False)
645 self.assertTimingAlmostEqual(acquire.elapsed, 0)
646
647 self.assertEqual(acquire(True, TIMEOUT2), False)
648 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
649
650 self.assertEqual(acquire(timeout=TIMEOUT3), False)
651 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
652
653
654class _TestCondition(BaseTestCase):
655
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000656 @classmethod
657 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000658 cond.acquire()
659 sleeping.release()
660 cond.wait(timeout)
661 woken.release()
662 cond.release()
663
664 def check_invariant(self, cond):
665 # this is only supposed to succeed when there are no sleepers
666 if self.TYPE == 'processes':
667 try:
668 sleepers = (cond._sleeping_count.get_value() -
669 cond._woken_count.get_value())
670 self.assertEqual(sleepers, 0)
671 self.assertEqual(cond._wait_semaphore.get_value(), 0)
672 except NotImplementedError:
673 pass
674
675 def test_notify(self):
676 cond = self.Condition()
677 sleeping = self.Semaphore(0)
678 woken = self.Semaphore(0)
679
680 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000681 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 p.start()
683
684 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000685 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000686 p.start()
687
688 # wait for both children to start sleeping
689 sleeping.acquire()
690 sleeping.acquire()
691
692 # check no process/thread has woken up
693 time.sleep(DELTA)
694 self.assertReturnsIfImplemented(0, get_value, woken)
695
696 # wake up one process/thread
697 cond.acquire()
698 cond.notify()
699 cond.release()
700
701 # check one process/thread has woken up
702 time.sleep(DELTA)
703 self.assertReturnsIfImplemented(1, get_value, woken)
704
705 # wake up another
706 cond.acquire()
707 cond.notify()
708 cond.release()
709
710 # check other has woken up
711 time.sleep(DELTA)
712 self.assertReturnsIfImplemented(2, get_value, woken)
713
714 # check state is not mucked up
715 self.check_invariant(cond)
716 p.join()
717
718 def test_notify_all(self):
719 cond = self.Condition()
720 sleeping = self.Semaphore(0)
721 woken = self.Semaphore(0)
722
723 # start some threads/processes which will timeout
724 for i in range(3):
725 p = self.Process(target=self.f,
726 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000727 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000728 p.start()
729
730 t = threading.Thread(target=self.f,
731 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000732 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000733 t.start()
734
735 # wait for them all to sleep
736 for i in xrange(6):
737 sleeping.acquire()
738
739 # check they have all timed out
740 for i in xrange(6):
741 woken.acquire()
742 self.assertReturnsIfImplemented(0, get_value, woken)
743
744 # check state is not mucked up
745 self.check_invariant(cond)
746
747 # start some more threads/processes
748 for i in range(3):
749 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000750 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000751 p.start()
752
753 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000754 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000755 t.start()
756
757 # wait for them to all sleep
758 for i in xrange(6):
759 sleeping.acquire()
760
761 # check no process/thread has woken up
762 time.sleep(DELTA)
763 self.assertReturnsIfImplemented(0, get_value, woken)
764
765 # wake them all up
766 cond.acquire()
767 cond.notify_all()
768 cond.release()
769
770 # check they have all woken
771 time.sleep(DELTA)
772 self.assertReturnsIfImplemented(6, get_value, woken)
773
774 # check state is not mucked up
775 self.check_invariant(cond)
776
777 def test_timeout(self):
778 cond = self.Condition()
779 wait = TimingWrapper(cond.wait)
780 cond.acquire()
781 res = wait(TIMEOUT1)
782 cond.release()
783 self.assertEqual(res, None)
784 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
785
786
787class _TestEvent(BaseTestCase):
788
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000789 @classmethod
790 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000791 time.sleep(TIMEOUT2)
792 event.set()
793
794 def test_event(self):
795 event = self.Event()
796 wait = TimingWrapper(event.wait)
797
Ezio Melottic2077b02011-03-16 12:34:31 +0200798 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000800 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000802 # Removed, threading.Event.wait() will return the value of the __flag
803 # instead of None. API Shear with the semaphore backed mp.Event
804 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000806 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000807 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
808
809 event.set()
810
811 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000812 self.assertEqual(event.is_set(), True)
813 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000814 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000815 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000816 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
817 # self.assertEqual(event.is_set(), True)
818
819 event.clear()
820
821 #self.assertEqual(event.is_set(), False)
822
Jesus Cea6f6016b2011-09-09 20:26:57 +0200823 p = self.Process(target=self._test_event, args=(event,))
824 p.daemon = True
825 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000826 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827
828#
829#
830#
831
832class _TestValue(BaseTestCase):
833
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000834 ALLOWED_TYPES = ('processes',)
835
Benjamin Petersondfd79492008-06-13 19:13:39 +0000836 codes_values = [
837 ('i', 4343, 24234),
838 ('d', 3.625, -4.25),
839 ('h', -232, 234),
840 ('c', latin('x'), latin('y'))
841 ]
842
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000843 def setUp(self):
844 if not HAS_SHAREDCTYPES:
845 self.skipTest("requires multiprocessing.sharedctypes")
846
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000847 @classmethod
848 def _test(cls, values):
849 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000850 sv.value = cv[2]
851
852
853 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000854 if raw:
855 values = [self.RawValue(code, value)
856 for code, value, _ in self.codes_values]
857 else:
858 values = [self.Value(code, value)
859 for code, value, _ in self.codes_values]
860
861 for sv, cv in zip(values, self.codes_values):
862 self.assertEqual(sv.value, cv[1])
863
864 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200865 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000866 proc.start()
867 proc.join()
868
869 for sv, cv in zip(values, self.codes_values):
870 self.assertEqual(sv.value, cv[2])
871
872 def test_rawvalue(self):
873 self.test_value(raw=True)
874
875 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 val1 = self.Value('i', 5)
877 lock1 = val1.get_lock()
878 obj1 = val1.get_obj()
879
880 val2 = self.Value('i', 5, lock=None)
881 lock2 = val2.get_lock()
882 obj2 = val2.get_obj()
883
884 lock = self.Lock()
885 val3 = self.Value('i', 5, lock=lock)
886 lock3 = val3.get_lock()
887 obj3 = val3.get_obj()
888 self.assertEqual(lock, lock3)
889
Jesse Noller6ab22152009-01-18 02:45:38 +0000890 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000891 self.assertFalse(hasattr(arr4, 'get_lock'))
892 self.assertFalse(hasattr(arr4, 'get_obj'))
893
Jesse Noller6ab22152009-01-18 02:45:38 +0000894 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
895
896 arr5 = self.RawValue('i', 5)
897 self.assertFalse(hasattr(arr5, 'get_lock'))
898 self.assertFalse(hasattr(arr5, 'get_obj'))
899
Benjamin Petersondfd79492008-06-13 19:13:39 +0000900
901class _TestArray(BaseTestCase):
902
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000903 ALLOWED_TYPES = ('processes',)
904
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000905 @classmethod
906 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 for i in range(1, len(seq)):
908 seq[i] += seq[i-1]
909
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000910 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000911 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
913 if raw:
914 arr = self.RawArray('i', seq)
915 else:
916 arr = self.Array('i', seq)
917
918 self.assertEqual(len(arr), len(seq))
919 self.assertEqual(arr[3], seq[3])
920 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
921
922 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
923
924 self.assertEqual(list(arr[:]), seq)
925
926 self.f(seq)
927
928 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200929 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000930 p.start()
931 p.join()
932
933 self.assertEqual(list(arr[:]), seq)
934
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000935 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000936 def test_array_from_size(self):
937 size = 10
938 # Test for zeroing (see issue #11675).
939 # The repetition below strengthens the test by increasing the chances
940 # of previously allocated non-zero memory being used for the new array
941 # on the 2nd and 3rd loops.
942 for _ in range(3):
943 arr = self.Array('i', size)
944 self.assertEqual(len(arr), size)
945 self.assertEqual(list(arr), [0] * size)
946 arr[:] = range(10)
947 self.assertEqual(list(arr), range(10))
948 del arr
949
950 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000951 def test_rawarray(self):
952 self.test_array(raw=True)
953
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000954 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000955 def test_array_accepts_long(self):
956 arr = self.Array('i', 10L)
957 self.assertEqual(len(arr), 10)
958 raw_arr = self.RawArray('i', 10L)
959 self.assertEqual(len(raw_arr), 10)
960
961 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000962 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000963 arr1 = self.Array('i', range(10))
964 lock1 = arr1.get_lock()
965 obj1 = arr1.get_obj()
966
967 arr2 = self.Array('i', range(10), lock=None)
968 lock2 = arr2.get_lock()
969 obj2 = arr2.get_obj()
970
971 lock = self.Lock()
972 arr3 = self.Array('i', range(10), lock=lock)
973 lock3 = arr3.get_lock()
974 obj3 = arr3.get_obj()
975 self.assertEqual(lock, lock3)
976
Jesse Noller6ab22152009-01-18 02:45:38 +0000977 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000978 self.assertFalse(hasattr(arr4, 'get_lock'))
979 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000980 self.assertRaises(AttributeError,
981 self.Array, 'i', range(10), lock='notalock')
982
983 arr5 = self.RawArray('i', range(10))
984 self.assertFalse(hasattr(arr5, 'get_lock'))
985 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000986
987#
988#
989#
990
991class _TestContainers(BaseTestCase):
992
993 ALLOWED_TYPES = ('manager',)
994
995 def test_list(self):
996 a = self.list(range(10))
997 self.assertEqual(a[:], range(10))
998
999 b = self.list()
1000 self.assertEqual(b[:], [])
1001
1002 b.extend(range(5))
1003 self.assertEqual(b[:], range(5))
1004
1005 self.assertEqual(b[2], 2)
1006 self.assertEqual(b[2:10], [2,3,4])
1007
1008 b *= 2
1009 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1010
1011 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1012
1013 self.assertEqual(a[:], range(10))
1014
1015 d = [a, b]
1016 e = self.list(d)
1017 self.assertEqual(
1018 e[:],
1019 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1020 )
1021
1022 f = self.list([a])
1023 a.append('hello')
1024 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1025
1026 def test_dict(self):
1027 d = self.dict()
1028 indices = range(65, 70)
1029 for i in indices:
1030 d[i] = chr(i)
1031 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1032 self.assertEqual(sorted(d.keys()), indices)
1033 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1034 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1035
1036 def test_namespace(self):
1037 n = self.Namespace()
1038 n.name = 'Bob'
1039 n.job = 'Builder'
1040 n._hidden = 'hidden'
1041 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1042 del n.job
1043 self.assertEqual(str(n), "Namespace(name='Bob')")
1044 self.assertTrue(hasattr(n, 'name'))
1045 self.assertTrue(not hasattr(n, 'job'))
1046
1047#
1048#
1049#
1050
1051def sqr(x, wait=0.0):
1052 time.sleep(wait)
1053 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001054class _TestPool(BaseTestCase):
1055
1056 def test_apply(self):
1057 papply = self.pool.apply
1058 self.assertEqual(papply(sqr, (5,)), sqr(5))
1059 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1060
1061 def test_map(self):
1062 pmap = self.pool.map
1063 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1064 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1065 map(sqr, range(100)))
1066
Jesse Noller7530e472009-07-16 14:23:04 +00001067 def test_map_chunksize(self):
1068 try:
1069 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1070 except multiprocessing.TimeoutError:
1071 self.fail("pool.map_async with chunksize stalled on null list")
1072
Benjamin Petersondfd79492008-06-13 19:13:39 +00001073 def test_async(self):
1074 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1075 get = TimingWrapper(res.get)
1076 self.assertEqual(get(), 49)
1077 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1078
1079 def test_async_timeout(self):
1080 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1081 get = TimingWrapper(res.get)
1082 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1083 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1084
1085 def test_imap(self):
1086 it = self.pool.imap(sqr, range(10))
1087 self.assertEqual(list(it), map(sqr, range(10)))
1088
1089 it = self.pool.imap(sqr, range(10))
1090 for i in range(10):
1091 self.assertEqual(it.next(), i*i)
1092 self.assertRaises(StopIteration, it.next)
1093
1094 it = self.pool.imap(sqr, range(1000), chunksize=100)
1095 for i in range(1000):
1096 self.assertEqual(it.next(), i*i)
1097 self.assertRaises(StopIteration, it.next)
1098
1099 def test_imap_unordered(self):
1100 it = self.pool.imap_unordered(sqr, range(1000))
1101 self.assertEqual(sorted(it), map(sqr, range(1000)))
1102
1103 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1104 self.assertEqual(sorted(it), map(sqr, range(1000)))
1105
1106 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001107 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1108 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1109
Benjamin Petersondfd79492008-06-13 19:13:39 +00001110 p = multiprocessing.Pool(3)
1111 self.assertEqual(3, len(p._pool))
1112 p.close()
1113 p.join()
1114
1115 def test_terminate(self):
1116 if self.TYPE == 'manager':
1117 # On Unix a forked process increfs each shared object to
1118 # which its parent process held a reference. If the
1119 # forked process gets terminated then there is likely to
1120 # be a reference leak. So to prevent
1121 # _TestZZZNumberOfObjects from failing we skip this test
1122 # when using a manager.
1123 return
1124
1125 result = self.pool.map_async(
1126 time.sleep, [0.1 for i in range(10000)], chunksize=1
1127 )
1128 self.pool.terminate()
1129 join = TimingWrapper(self.pool.join)
1130 join()
1131 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001132
1133class _TestPoolWorkerLifetime(BaseTestCase):
1134
1135 ALLOWED_TYPES = ('processes', )
1136 def test_pool_worker_lifetime(self):
1137 p = multiprocessing.Pool(3, maxtasksperchild=10)
1138 self.assertEqual(3, len(p._pool))
1139 origworkerpids = [w.pid for w in p._pool]
1140 # Run many tasks so each worker gets replaced (hopefully)
1141 results = []
1142 for i in range(100):
1143 results.append(p.apply_async(sqr, (i, )))
1144 # Fetch the results and verify we got the right answers,
1145 # also ensuring all the tasks have completed.
1146 for (j, res) in enumerate(results):
1147 self.assertEqual(res.get(), sqr(j))
1148 # Refill the pool
1149 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001150 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001151 # (countdown * DELTA = 5 seconds max startup process time)
1152 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001153 while countdown and not all(w.is_alive() for w in p._pool):
1154 countdown -= 1
1155 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001156 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001157 # All pids should be assigned. See issue #7805.
1158 self.assertNotIn(None, origworkerpids)
1159 self.assertNotIn(None, finalworkerpids)
1160 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001161 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1162 p.close()
1163 p.join()
1164
Benjamin Petersondfd79492008-06-13 19:13:39 +00001165#
1166# Test that manager has expected number of shared objects left
1167#
1168
1169class _TestZZZNumberOfObjects(BaseTestCase):
1170 # Because test cases are sorted alphabetically, this one will get
1171 # run after all the other tests for the manager. It tests that
1172 # there have been no "reference leaks" for the manager's shared
1173 # objects. Note the comment in _TestPool.test_terminate().
1174 ALLOWED_TYPES = ('manager',)
1175
1176 def test_number_of_objects(self):
1177 EXPECTED_NUMBER = 1 # the pool object is still alive
1178 multiprocessing.active_children() # discard dead process objs
1179 gc.collect() # do garbage collection
1180 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001181 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001182 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001183 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001184 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001185
1186 self.assertEqual(refs, EXPECTED_NUMBER)
1187
1188#
1189# Test of creating a customized manager class
1190#
1191
1192from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1193
1194class FooBar(object):
1195 def f(self):
1196 return 'f()'
1197 def g(self):
1198 raise ValueError
1199 def _h(self):
1200 return '_h()'
1201
1202def baz():
1203 for i in xrange(10):
1204 yield i*i
1205
1206class IteratorProxy(BaseProxy):
1207 _exposed_ = ('next', '__next__')
1208 def __iter__(self):
1209 return self
1210 def next(self):
1211 return self._callmethod('next')
1212 def __next__(self):
1213 return self._callmethod('__next__')
1214
1215class MyManager(BaseManager):
1216 pass
1217
1218MyManager.register('Foo', callable=FooBar)
1219MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1220MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1221
1222
1223class _TestMyManager(BaseTestCase):
1224
1225 ALLOWED_TYPES = ('manager',)
1226
1227 def test_mymanager(self):
1228 manager = MyManager()
1229 manager.start()
1230
1231 foo = manager.Foo()
1232 bar = manager.Bar()
1233 baz = manager.baz()
1234
1235 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1236 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1237
1238 self.assertEqual(foo_methods, ['f', 'g'])
1239 self.assertEqual(bar_methods, ['f', '_h'])
1240
1241 self.assertEqual(foo.f(), 'f()')
1242 self.assertRaises(ValueError, foo.g)
1243 self.assertEqual(foo._callmethod('f'), 'f()')
1244 self.assertRaises(RemoteError, foo._callmethod, '_h')
1245
1246 self.assertEqual(bar.f(), 'f()')
1247 self.assertEqual(bar._h(), '_h()')
1248 self.assertEqual(bar._callmethod('f'), 'f()')
1249 self.assertEqual(bar._callmethod('_h'), '_h()')
1250
1251 self.assertEqual(list(baz), [i*i for i in range(10)])
1252
1253 manager.shutdown()
1254
1255#
1256# Test of connecting to a remote server and using xmlrpclib for serialization
1257#
1258
1259_queue = Queue.Queue()
1260def get_queue():
1261 return _queue
1262
1263class QueueManager(BaseManager):
1264 '''manager class used by server process'''
1265QueueManager.register('get_queue', callable=get_queue)
1266
1267class QueueManager2(BaseManager):
1268 '''manager class which specifies the same interface as QueueManager'''
1269QueueManager2.register('get_queue')
1270
1271
1272SERIALIZER = 'xmlrpclib'
1273
1274class _TestRemoteManager(BaseTestCase):
1275
1276 ALLOWED_TYPES = ('manager',)
1277
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001278 @classmethod
1279 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001280 manager = QueueManager2(
1281 address=address, authkey=authkey, serializer=SERIALIZER
1282 )
1283 manager.connect()
1284 queue = manager.get_queue()
1285 queue.put(('hello world', None, True, 2.25))
1286
1287 def test_remote(self):
1288 authkey = os.urandom(32)
1289
1290 manager = QueueManager(
1291 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1292 )
1293 manager.start()
1294
1295 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001296 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001297 p.start()
1298
1299 manager2 = QueueManager2(
1300 address=manager.address, authkey=authkey, serializer=SERIALIZER
1301 )
1302 manager2.connect()
1303 queue = manager2.get_queue()
1304
1305 # Note that xmlrpclib will deserialize object as a list not a tuple
1306 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1307
1308 # Because we are using xmlrpclib for serialization instead of
1309 # pickle this will cause a serialization error.
1310 self.assertRaises(Exception, queue.put, time.sleep)
1311
1312 # Make queue finalizer run before the server is stopped
1313 del queue
1314 manager.shutdown()
1315
Jesse Noller459a6482009-03-30 15:50:42 +00001316class _TestManagerRestart(BaseTestCase):
1317
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001318 @classmethod
1319 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001320 manager = QueueManager(
1321 address=address, authkey=authkey, serializer=SERIALIZER)
1322 manager.connect()
1323 queue = manager.get_queue()
1324 queue.put('hello world')
1325
1326 def test_rapid_restart(self):
1327 authkey = os.urandom(32)
1328 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001329 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001330 srvr = manager.get_server()
1331 addr = srvr.address
1332 # Close the connection.Listener socket which gets opened as a part
1333 # of manager.get_server(). It's not needed for the test.
1334 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001335 manager.start()
1336
1337 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001338 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001339 p.start()
1340 queue = manager.get_queue()
1341 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001342 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001343 manager.shutdown()
1344 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001345 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001346 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001347 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001348
Benjamin Petersondfd79492008-06-13 19:13:39 +00001349#
1350#
1351#
1352
1353SENTINEL = latin('')
1354
1355class _TestConnection(BaseTestCase):
1356
1357 ALLOWED_TYPES = ('processes', 'threads')
1358
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001359 @classmethod
1360 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001361 for msg in iter(conn.recv_bytes, SENTINEL):
1362 conn.send_bytes(msg)
1363 conn.close()
1364
1365 def test_connection(self):
1366 conn, child_conn = self.Pipe()
1367
1368 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001369 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001370 p.start()
1371
1372 seq = [1, 2.25, None]
1373 msg = latin('hello world')
1374 longmsg = msg * 10
1375 arr = array.array('i', range(4))
1376
1377 if self.TYPE == 'processes':
1378 self.assertEqual(type(conn.fileno()), int)
1379
1380 self.assertEqual(conn.send(seq), None)
1381 self.assertEqual(conn.recv(), seq)
1382
1383 self.assertEqual(conn.send_bytes(msg), None)
1384 self.assertEqual(conn.recv_bytes(), msg)
1385
1386 if self.TYPE == 'processes':
1387 buffer = array.array('i', [0]*10)
1388 expected = list(arr) + [0] * (10 - len(arr))
1389 self.assertEqual(conn.send_bytes(arr), None)
1390 self.assertEqual(conn.recv_bytes_into(buffer),
1391 len(arr) * buffer.itemsize)
1392 self.assertEqual(list(buffer), expected)
1393
1394 buffer = array.array('i', [0]*10)
1395 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1396 self.assertEqual(conn.send_bytes(arr), None)
1397 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1398 len(arr) * buffer.itemsize)
1399 self.assertEqual(list(buffer), expected)
1400
1401 buffer = bytearray(latin(' ' * 40))
1402 self.assertEqual(conn.send_bytes(longmsg), None)
1403 try:
1404 res = conn.recv_bytes_into(buffer)
1405 except multiprocessing.BufferTooShort, e:
1406 self.assertEqual(e.args, (longmsg,))
1407 else:
1408 self.fail('expected BufferTooShort, got %s' % res)
1409
1410 poll = TimingWrapper(conn.poll)
1411
1412 self.assertEqual(poll(), False)
1413 self.assertTimingAlmostEqual(poll.elapsed, 0)
1414
1415 self.assertEqual(poll(TIMEOUT1), False)
1416 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1417
1418 conn.send(None)
1419
1420 self.assertEqual(poll(TIMEOUT1), True)
1421 self.assertTimingAlmostEqual(poll.elapsed, 0)
1422
1423 self.assertEqual(conn.recv(), None)
1424
1425 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1426 conn.send_bytes(really_big_msg)
1427 self.assertEqual(conn.recv_bytes(), really_big_msg)
1428
1429 conn.send_bytes(SENTINEL) # tell child to quit
1430 child_conn.close()
1431
1432 if self.TYPE == 'processes':
1433 self.assertEqual(conn.readable, True)
1434 self.assertEqual(conn.writable, True)
1435 self.assertRaises(EOFError, conn.recv)
1436 self.assertRaises(EOFError, conn.recv_bytes)
1437
1438 p.join()
1439
1440 def test_duplex_false(self):
1441 reader, writer = self.Pipe(duplex=False)
1442 self.assertEqual(writer.send(1), None)
1443 self.assertEqual(reader.recv(), 1)
1444 if self.TYPE == 'processes':
1445 self.assertEqual(reader.readable, True)
1446 self.assertEqual(reader.writable, False)
1447 self.assertEqual(writer.readable, False)
1448 self.assertEqual(writer.writable, True)
1449 self.assertRaises(IOError, reader.send, 2)
1450 self.assertRaises(IOError, writer.recv)
1451 self.assertRaises(IOError, writer.poll)
1452
1453 def test_spawn_close(self):
1454 # We test that a pipe connection can be closed by parent
1455 # process immediately after child is spawned. On Windows this
1456 # would have sometimes failed on old versions because
1457 # child_conn would be closed before the child got a chance to
1458 # duplicate it.
1459 conn, child_conn = self.Pipe()
1460
1461 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001462 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001463 p.start()
1464 child_conn.close() # this might complete before child initializes
1465
1466 msg = latin('hello')
1467 conn.send_bytes(msg)
1468 self.assertEqual(conn.recv_bytes(), msg)
1469
1470 conn.send_bytes(SENTINEL)
1471 conn.close()
1472 p.join()
1473
1474 def test_sendbytes(self):
1475 if self.TYPE != 'processes':
1476 return
1477
1478 msg = latin('abcdefghijklmnopqrstuvwxyz')
1479 a, b = self.Pipe()
1480
1481 a.send_bytes(msg)
1482 self.assertEqual(b.recv_bytes(), msg)
1483
1484 a.send_bytes(msg, 5)
1485 self.assertEqual(b.recv_bytes(), msg[5:])
1486
1487 a.send_bytes(msg, 7, 8)
1488 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1489
1490 a.send_bytes(msg, 26)
1491 self.assertEqual(b.recv_bytes(), latin(''))
1492
1493 a.send_bytes(msg, 26, 0)
1494 self.assertEqual(b.recv_bytes(), latin(''))
1495
1496 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1497
1498 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1499
1500 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1501
1502 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1503
1504 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1505
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001506 @classmethod
1507 def _is_fd_assigned(cls, fd):
1508 try:
1509 os.fstat(fd)
1510 except OSError as e:
1511 if e.errno == errno.EBADF:
1512 return False
1513 raise
1514 else:
1515 return True
1516
1517 @classmethod
1518 def _writefd(cls, conn, data, create_dummy_fds=False):
1519 if create_dummy_fds:
1520 for i in range(0, 256):
1521 if not cls._is_fd_assigned(i):
1522 os.dup2(conn.fileno(), i)
1523 fd = reduction.recv_handle(conn)
1524 if msvcrt:
1525 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1526 os.write(fd, data)
1527 os.close(fd)
1528
1529 def test_fd_transfer(self):
1530 if self.TYPE != 'processes':
1531 self.skipTest("only makes sense with processes")
1532 conn, child_conn = self.Pipe(duplex=True)
1533
1534 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001535 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001536 p.start()
1537 with open(test_support.TESTFN, "wb") as f:
1538 fd = f.fileno()
1539 if msvcrt:
1540 fd = msvcrt.get_osfhandle(fd)
1541 reduction.send_handle(conn, fd, p.pid)
1542 p.join()
1543 with open(test_support.TESTFN, "rb") as f:
1544 self.assertEqual(f.read(), b"foo")
1545
1546 @unittest.skipIf(sys.platform == "win32",
1547 "test semantics don't make sense on Windows")
1548 @unittest.skipIf(MAXFD <= 256,
1549 "largest assignable fd number is too small")
1550 @unittest.skipUnless(hasattr(os, "dup2"),
1551 "test needs os.dup2()")
1552 def test_large_fd_transfer(self):
1553 # With fd > 256 (issue #11657)
1554 if self.TYPE != 'processes':
1555 self.skipTest("only makes sense with processes")
1556 conn, child_conn = self.Pipe(duplex=True)
1557
1558 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001559 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001560 p.start()
1561 with open(test_support.TESTFN, "wb") as f:
1562 fd = f.fileno()
1563 for newfd in range(256, MAXFD):
1564 if not self._is_fd_assigned(newfd):
1565 break
1566 else:
1567 self.fail("could not find an unassigned large file descriptor")
1568 os.dup2(fd, newfd)
1569 try:
1570 reduction.send_handle(conn, newfd, p.pid)
1571 finally:
1572 os.close(newfd)
1573 p.join()
1574 with open(test_support.TESTFN, "rb") as f:
1575 self.assertEqual(f.read(), b"bar")
1576
Jesus Ceac23484b2011-09-21 03:47:39 +02001577 @classmethod
1578 def _send_data_without_fd(self, conn):
1579 os.write(conn.fileno(), b"\0")
1580
1581 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1582 def test_missing_fd_transfer(self):
1583 # Check that exception is raised when received data is not
1584 # accompanied by a file descriptor in ancillary data.
1585 if self.TYPE != 'processes':
1586 self.skipTest("only makes sense with processes")
1587 conn, child_conn = self.Pipe(duplex=True)
1588
1589 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1590 p.daemon = True
1591 p.start()
1592 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1593 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001594
Benjamin Petersondfd79492008-06-13 19:13:39 +00001595class _TestListenerClient(BaseTestCase):
1596
1597 ALLOWED_TYPES = ('processes', 'threads')
1598
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001599 @classmethod
1600 def _test(cls, address):
1601 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001602 conn.send('hello')
1603 conn.close()
1604
1605 def test_listener_client(self):
1606 for family in self.connection.families:
1607 l = self.connection.Listener(family=family)
1608 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001609 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001610 p.start()
1611 conn = l.accept()
1612 self.assertEqual(conn.recv(), 'hello')
1613 p.join()
1614 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001615#
1616# Test of sending connection and socket objects between processes
1617#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001618"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001619class _TestPicklingConnections(BaseTestCase):
1620
1621 ALLOWED_TYPES = ('processes',)
1622
1623 def _listener(self, conn, families):
1624 for fam in families:
1625 l = self.connection.Listener(family=fam)
1626 conn.send(l.address)
1627 new_conn = l.accept()
1628 conn.send(new_conn)
1629
1630 if self.TYPE == 'processes':
1631 l = socket.socket()
1632 l.bind(('localhost', 0))
1633 conn.send(l.getsockname())
1634 l.listen(1)
1635 new_conn, addr = l.accept()
1636 conn.send(new_conn)
1637
1638 conn.recv()
1639
1640 def _remote(self, conn):
1641 for (address, msg) in iter(conn.recv, None):
1642 client = self.connection.Client(address)
1643 client.send(msg.upper())
1644 client.close()
1645
1646 if self.TYPE == 'processes':
1647 address, msg = conn.recv()
1648 client = socket.socket()
1649 client.connect(address)
1650 client.sendall(msg.upper())
1651 client.close()
1652
1653 conn.close()
1654
1655 def test_pickling(self):
1656 try:
1657 multiprocessing.allow_connection_pickling()
1658 except ImportError:
1659 return
1660
1661 families = self.connection.families
1662
1663 lconn, lconn0 = self.Pipe()
1664 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001665 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001666 lp.start()
1667 lconn0.close()
1668
1669 rconn, rconn0 = self.Pipe()
1670 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001671 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001672 rp.start()
1673 rconn0.close()
1674
1675 for fam in families:
1676 msg = ('This connection uses family %s' % fam).encode('ascii')
1677 address = lconn.recv()
1678 rconn.send((address, msg))
1679 new_conn = lconn.recv()
1680 self.assertEqual(new_conn.recv(), msg.upper())
1681
1682 rconn.send(None)
1683
1684 if self.TYPE == 'processes':
1685 msg = latin('This connection uses a normal socket')
1686 address = lconn.recv()
1687 rconn.send((address, msg))
1688 if hasattr(socket, 'fromfd'):
1689 new_conn = lconn.recv()
1690 self.assertEqual(new_conn.recv(100), msg.upper())
1691 else:
1692 # XXX On Windows with Py2.6 need to backport fromfd()
1693 discard = lconn.recv_bytes()
1694
1695 lconn.send(None)
1696
1697 rconn.close()
1698 lconn.close()
1699
1700 lp.join()
1701 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001702"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001703#
1704#
1705#
1706
1707class _TestHeap(BaseTestCase):
1708
1709 ALLOWED_TYPES = ('processes',)
1710
1711 def test_heap(self):
1712 iterations = 5000
1713 maxblocks = 50
1714 blocks = []
1715
1716 # create and destroy lots of blocks of different sizes
1717 for i in xrange(iterations):
1718 size = int(random.lognormvariate(0, 1) * 1000)
1719 b = multiprocessing.heap.BufferWrapper(size)
1720 blocks.append(b)
1721 if len(blocks) > maxblocks:
1722 i = random.randrange(maxblocks)
1723 del blocks[i]
1724
1725 # get the heap object
1726 heap = multiprocessing.heap.BufferWrapper._heap
1727
1728 # verify the state of the heap
1729 all = []
1730 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001731 heap._lock.acquire()
1732 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001733 for L in heap._len_to_seq.values():
1734 for arena, start, stop in L:
1735 all.append((heap._arenas.index(arena), start, stop,
1736 stop-start, 'free'))
1737 for arena, start, stop in heap._allocated_blocks:
1738 all.append((heap._arenas.index(arena), start, stop,
1739 stop-start, 'occupied'))
1740 occupied += (stop-start)
1741
1742 all.sort()
1743
1744 for i in range(len(all)-1):
1745 (arena, start, stop) = all[i][:3]
1746 (narena, nstart, nstop) = all[i+1][:3]
1747 self.assertTrue((arena != narena and nstart == 0) or
1748 (stop == nstart))
1749
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001750 def test_free_from_gc(self):
1751 # Check that freeing of blocks by the garbage collector doesn't deadlock
1752 # (issue #12352).
1753 # Make sure the GC is enabled, and set lower collection thresholds to
1754 # make collections more frequent (and increase the probability of
1755 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001756 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001757 gc.enable()
1758 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001759 thresholds = gc.get_threshold()
1760 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001761 gc.set_threshold(10)
1762
1763 # perform numerous block allocations, with cyclic references to make
1764 # sure objects are collected asynchronously by the gc
1765 for i in range(5000):
1766 a = multiprocessing.heap.BufferWrapper(1)
1767 b = multiprocessing.heap.BufferWrapper(1)
1768 # circular references
1769 a.buddy = b
1770 b.buddy = a
1771
Benjamin Petersondfd79492008-06-13 19:13:39 +00001772#
1773#
1774#
1775
Benjamin Petersondfd79492008-06-13 19:13:39 +00001776class _Foo(Structure):
1777 _fields_ = [
1778 ('x', c_int),
1779 ('y', c_double)
1780 ]
1781
1782class _TestSharedCTypes(BaseTestCase):
1783
1784 ALLOWED_TYPES = ('processes',)
1785
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001786 def setUp(self):
1787 if not HAS_SHAREDCTYPES:
1788 self.skipTest("requires multiprocessing.sharedctypes")
1789
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001790 @classmethod
1791 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001792 x.value *= 2
1793 y.value *= 2
1794 foo.x *= 2
1795 foo.y *= 2
1796 string.value *= 2
1797 for i in range(len(arr)):
1798 arr[i] *= 2
1799
1800 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001801 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001802 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001803 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001804 arr = self.Array('d', range(10), lock=lock)
1805 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001806 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001807
1808 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001809 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001810 p.start()
1811 p.join()
1812
1813 self.assertEqual(x.value, 14)
1814 self.assertAlmostEqual(y.value, 2.0/3.0)
1815 self.assertEqual(foo.x, 6)
1816 self.assertAlmostEqual(foo.y, 4.0)
1817 for i in range(10):
1818 self.assertAlmostEqual(arr[i], i*2)
1819 self.assertEqual(string.value, latin('hellohello'))
1820
1821 def test_synchronize(self):
1822 self.test_sharedctypes(lock=True)
1823
1824 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001825 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001826 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001827 foo.x = 0
1828 foo.y = 0
1829 self.assertEqual(bar.x, 2)
1830 self.assertAlmostEqual(bar.y, 5.0)
1831
1832#
1833#
1834#
1835
1836class _TestFinalize(BaseTestCase):
1837
1838 ALLOWED_TYPES = ('processes',)
1839
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001840 @classmethod
1841 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001842 class Foo(object):
1843 pass
1844
1845 a = Foo()
1846 util.Finalize(a, conn.send, args=('a',))
1847 del a # triggers callback for a
1848
1849 b = Foo()
1850 close_b = util.Finalize(b, conn.send, args=('b',))
1851 close_b() # triggers callback for b
1852 close_b() # does nothing because callback has already been called
1853 del b # does nothing because callback has already been called
1854
1855 c = Foo()
1856 util.Finalize(c, conn.send, args=('c',))
1857
1858 d10 = Foo()
1859 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1860
1861 d01 = Foo()
1862 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1863 d02 = Foo()
1864 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1865 d03 = Foo()
1866 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1867
1868 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1869
1870 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1871
Ezio Melottic2077b02011-03-16 12:34:31 +02001872 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001873 # garbage collecting locals
1874 util._exit_function()
1875 conn.close()
1876 os._exit(0)
1877
1878 def test_finalize(self):
1879 conn, child_conn = self.Pipe()
1880
1881 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001882 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001883 p.start()
1884 p.join()
1885
1886 result = [obj for obj in iter(conn.recv, 'STOP')]
1887 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1888
1889#
1890# Test that from ... import * works for each module
1891#
1892
1893class _TestImportStar(BaseTestCase):
1894
1895 ALLOWED_TYPES = ('processes',)
1896
1897 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001898 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001899 'multiprocessing', 'multiprocessing.connection',
1900 'multiprocessing.heap', 'multiprocessing.managers',
1901 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001902 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001903 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001904 ]
1905
1906 if c_int is not None:
1907 # This module requires _ctypes
1908 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001909
1910 for name in modules:
1911 __import__(name)
1912 mod = sys.modules[name]
1913
1914 for attr in getattr(mod, '__all__', ()):
1915 self.assertTrue(
1916 hasattr(mod, attr),
1917 '%r does not have attribute %r' % (mod, attr)
1918 )
1919
1920#
1921# Quick test that logging works -- does not test logging output
1922#
1923
1924class _TestLogging(BaseTestCase):
1925
1926 ALLOWED_TYPES = ('processes',)
1927
1928 def test_enable_logging(self):
1929 logger = multiprocessing.get_logger()
1930 logger.setLevel(util.SUBWARNING)
1931 self.assertTrue(logger is not None)
1932 logger.debug('this will not be printed')
1933 logger.info('nor will this')
1934 logger.setLevel(LOG_LEVEL)
1935
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001936 @classmethod
1937 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001938 logger = multiprocessing.get_logger()
1939 conn.send(logger.getEffectiveLevel())
1940
1941 def test_level(self):
1942 LEVEL1 = 32
1943 LEVEL2 = 37
1944
1945 logger = multiprocessing.get_logger()
1946 root_logger = logging.getLogger()
1947 root_level = root_logger.level
1948
1949 reader, writer = multiprocessing.Pipe(duplex=False)
1950
1951 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001952 p = self.Process(target=self._test_level, args=(writer,))
1953 p.daemon = True
1954 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001955 self.assertEqual(LEVEL1, reader.recv())
1956
1957 logger.setLevel(logging.NOTSET)
1958 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001959 p = self.Process(target=self._test_level, args=(writer,))
1960 p.daemon = True
1961 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001962 self.assertEqual(LEVEL2, reader.recv())
1963
1964 root_logger.setLevel(root_level)
1965 logger.setLevel(level=LOG_LEVEL)
1966
Jesse Noller814d02d2009-11-21 14:38:23 +00001967
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001968# class _TestLoggingProcessName(BaseTestCase):
1969#
1970# def handle(self, record):
1971# assert record.processName == multiprocessing.current_process().name
1972# self.__handled = True
1973#
1974# def test_logging(self):
1975# handler = logging.Handler()
1976# handler.handle = self.handle
1977# self.__handled = False
1978# # Bypass getLogger() and side-effects
1979# logger = logging.getLoggerClass()(
1980# 'multiprocessing.test.TestLoggingProcessName')
1981# logger.addHandler(handler)
1982# logger.propagate = False
1983#
1984# logger.warn('foo')
1985# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001986
Benjamin Petersondfd79492008-06-13 19:13:39 +00001987#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001988# Test to verify handle verification, see issue 3321
1989#
1990
1991class TestInvalidHandle(unittest.TestCase):
1992
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001993 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001994 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001995 conn = _multiprocessing.Connection(44977608)
1996 self.assertRaises(IOError, conn.poll)
1997 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001998
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001999#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002000# Functions used to create test cases from the base ones in this module
2001#
2002
2003def get_attributes(Source, names):
2004 d = {}
2005 for name in names:
2006 obj = getattr(Source, name)
2007 if type(obj) == type(get_attributes):
2008 obj = staticmethod(obj)
2009 d[name] = obj
2010 return d
2011
2012def create_test_cases(Mixin, type):
2013 result = {}
2014 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002015 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002016
2017 for name in glob.keys():
2018 if name.startswith('_Test'):
2019 base = glob[name]
2020 if type in base.ALLOWED_TYPES:
2021 newname = 'With' + Type + name[1:]
2022 class Temp(base, unittest.TestCase, Mixin):
2023 pass
2024 result[newname] = Temp
2025 Temp.__name__ = newname
2026 Temp.__module__ = Mixin.__module__
2027 return result
2028
2029#
2030# Create test cases
2031#
2032
2033class ProcessesMixin(object):
2034 TYPE = 'processes'
2035 Process = multiprocessing.Process
2036 locals().update(get_attributes(multiprocessing, (
2037 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2038 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2039 'RawArray', 'current_process', 'active_children', 'Pipe',
2040 'connection', 'JoinableQueue'
2041 )))
2042
2043testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2044globals().update(testcases_processes)
2045
2046
2047class ManagerMixin(object):
2048 TYPE = 'manager'
2049 Process = multiprocessing.Process
2050 manager = object.__new__(multiprocessing.managers.SyncManager)
2051 locals().update(get_attributes(manager, (
2052 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2053 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2054 'Namespace', 'JoinableQueue'
2055 )))
2056
2057testcases_manager = create_test_cases(ManagerMixin, type='manager')
2058globals().update(testcases_manager)
2059
2060
2061class ThreadsMixin(object):
2062 TYPE = 'threads'
2063 Process = multiprocessing.dummy.Process
2064 locals().update(get_attributes(multiprocessing.dummy, (
2065 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2066 'Condition', 'Event', 'Value', 'Array', 'current_process',
2067 'active_children', 'Pipe', 'connection', 'dict', 'list',
2068 'Namespace', 'JoinableQueue'
2069 )))
2070
2071testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2072globals().update(testcases_threads)
2073
Neal Norwitz0c519b32008-08-25 01:50:24 +00002074class OtherTest(unittest.TestCase):
2075 # TODO: add more tests for deliver/answer challenge.
2076 def test_deliver_challenge_auth_failure(self):
2077 class _FakeConnection(object):
2078 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002079 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002080 def send_bytes(self, data):
2081 pass
2082 self.assertRaises(multiprocessing.AuthenticationError,
2083 multiprocessing.connection.deliver_challenge,
2084 _FakeConnection(), b'abc')
2085
2086 def test_answer_challenge_auth_failure(self):
2087 class _FakeConnection(object):
2088 def __init__(self):
2089 self.count = 0
2090 def recv_bytes(self, size):
2091 self.count += 1
2092 if self.count == 1:
2093 return multiprocessing.connection.CHALLENGE
2094 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002095 return b'something bogus'
2096 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002097 def send_bytes(self, data):
2098 pass
2099 self.assertRaises(multiprocessing.AuthenticationError,
2100 multiprocessing.connection.answer_challenge,
2101 _FakeConnection(), b'abc')
2102
Jesse Noller7152f6d2009-04-02 05:17:26 +00002103#
2104# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2105#
2106
2107def initializer(ns):
2108 ns.test += 1
2109
2110class TestInitializers(unittest.TestCase):
2111 def setUp(self):
2112 self.mgr = multiprocessing.Manager()
2113 self.ns = self.mgr.Namespace()
2114 self.ns.test = 0
2115
2116 def tearDown(self):
2117 self.mgr.shutdown()
2118
2119 def test_manager_initializer(self):
2120 m = multiprocessing.managers.SyncManager()
2121 self.assertRaises(TypeError, m.start, 1)
2122 m.start(initializer, (self.ns,))
2123 self.assertEqual(self.ns.test, 1)
2124 m.shutdown()
2125
2126 def test_pool_initializer(self):
2127 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2128 p = multiprocessing.Pool(1, initializer, (self.ns,))
2129 p.close()
2130 p.join()
2131 self.assertEqual(self.ns.test, 1)
2132
Jesse Noller1b90efb2009-06-30 17:11:52 +00002133#
2134# Issue 5155, 5313, 5331: Test process in processes
2135# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2136#
2137
2138def _ThisSubProcess(q):
2139 try:
2140 item = q.get(block=False)
2141 except Queue.Empty:
2142 pass
2143
2144def _TestProcess(q):
2145 queue = multiprocessing.Queue()
2146 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002147 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002148 subProc.start()
2149 subProc.join()
2150
2151def _afunc(x):
2152 return x*x
2153
2154def pool_in_process():
2155 pool = multiprocessing.Pool(processes=4)
2156 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2157
2158class _file_like(object):
2159 def __init__(self, delegate):
2160 self._delegate = delegate
2161 self._pid = None
2162
2163 @property
2164 def cache(self):
2165 pid = os.getpid()
2166 # There are no race conditions since fork keeps only the running thread
2167 if pid != self._pid:
2168 self._pid = pid
2169 self._cache = []
2170 return self._cache
2171
2172 def write(self, data):
2173 self.cache.append(data)
2174
2175 def flush(self):
2176 self._delegate.write(''.join(self.cache))
2177 self._cache = []
2178
2179class TestStdinBadfiledescriptor(unittest.TestCase):
2180
2181 def test_queue_in_process(self):
2182 queue = multiprocessing.Queue()
2183 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2184 proc.start()
2185 proc.join()
2186
2187 def test_pool_in_process(self):
2188 p = multiprocessing.Process(target=pool_in_process)
2189 p.start()
2190 p.join()
2191
2192 def test_flushing(self):
2193 sio = StringIO()
2194 flike = _file_like(sio)
2195 flike.write('foo')
2196 proc = multiprocessing.Process(target=lambda: flike.flush())
2197 flike.flush()
2198 assert sio.getvalue() == 'foo'
2199
2200testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2201 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002202
Benjamin Petersondfd79492008-06-13 19:13:39 +00002203#
2204#
2205#
2206
2207def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002208 if sys.platform.startswith("linux"):
2209 try:
2210 lock = multiprocessing.RLock()
2211 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002212 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002213
Benjamin Petersondfd79492008-06-13 19:13:39 +00002214 if run is None:
2215 from test.test_support import run_unittest as run
2216
2217 util.get_temp_dir() # creates temp directory for use by all processes
2218
2219 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2220
Jesse Noller146b7ab2008-07-02 16:44:09 +00002221 ProcessesMixin.pool = multiprocessing.Pool(4)
2222 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2223 ManagerMixin.manager.__init__()
2224 ManagerMixin.manager.start()
2225 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002226
2227 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002228 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2229 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002230 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2231 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002232 )
2233
2234 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2235 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002236 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2237 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002238 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002239 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002240 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002241 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2242 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2243 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002244 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002245
Jesse Noller146b7ab2008-07-02 16:44:09 +00002246 ThreadsMixin.pool.terminate()
2247 ProcessesMixin.pool.terminate()
2248 ManagerMixin.pool.terminate()
2249 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002250
Jesse Noller146b7ab2008-07-02 16:44:09 +00002251 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002252
2253def main():
2254 test_main(unittest.TextTestRunner(verbosity=2).run)
2255
2256if __name__ == '__main__':
2257 main()