blob: 14847d92a1a879db74841f5da8c3573985f5172b [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
98#
Benjamin Petersondfd79492008-06-13 19:13:39 +000099# Creates a wrapper for a function which records the time it takes to finish
100#
101
102class TimingWrapper(object):
103
104 def __init__(self, func):
105 self.func = func
106 self.elapsed = None
107
108 def __call__(self, *args, **kwds):
109 t = time.time()
110 try:
111 return self.func(*args, **kwds)
112 finally:
113 self.elapsed = time.time() - t
114
115#
116# Base class for test cases
117#
118
119class BaseTestCase(object):
120
121 ALLOWED_TYPES = ('processes', 'manager', 'threads')
122
123 def assertTimingAlmostEqual(self, a, b):
124 if CHECK_TIMINGS:
125 self.assertAlmostEqual(a, b, 1)
126
127 def assertReturnsIfImplemented(self, value, func, *args):
128 try:
129 res = func(*args)
130 except NotImplementedError:
131 pass
132 else:
133 return self.assertEqual(value, res)
134
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000135 # For the sanity of Windows users, rather than crashing or freezing in
136 # multiple ways.
137 def __reduce__(self, *args):
138 raise NotImplementedError("shouldn't try to pickle a test case")
139
140 __reduce_ex__ = __reduce__
141
Benjamin Petersondfd79492008-06-13 19:13:39 +0000142#
143# Return the value of a semaphore
144#
145
146def get_value(self):
147 try:
148 return self.get_value()
149 except AttributeError:
150 try:
151 return self._Semaphore__value
152 except AttributeError:
153 try:
154 return self._value
155 except AttributeError:
156 raise NotImplementedError
157
158#
159# Testcases
160#
161
162class _TestProcess(BaseTestCase):
163
164 ALLOWED_TYPES = ('processes', 'threads')
165
166 def test_current(self):
167 if self.TYPE == 'threads':
168 return
169
170 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000172
173 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000174 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000175 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000176 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000177 self.assertEqual(current.ident, os.getpid())
178 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000180 @classmethod
181 def _test(cls, q, *args, **kwds):
182 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 q.put(args)
184 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000185 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000186 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188 q.put(current.pid)
189
190 def test_process(self):
191 q = self.Queue(1)
192 e = self.Event()
193 args = (q, 1, 2)
194 kwargs = {'hello':23, 'bye':2.54}
195 name = 'SomeProcess'
196 p = self.Process(
197 target=self._test, args=args, kwargs=kwargs, name=name
198 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000199 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000200 current = self.current_process()
201
202 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000203 self.assertEqual(p.authkey, current.authkey)
204 self.assertEqual(p.is_alive(), False)
205 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000206 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000207 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000208 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000209
210 p.start()
211
Ezio Melotti2623a372010-11-21 13:34:58 +0000212 self.assertEqual(p.exitcode, None)
213 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000214 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215
Ezio Melotti2623a372010-11-21 13:34:58 +0000216 self.assertEqual(q.get(), args[1:])
217 self.assertEqual(q.get(), kwargs)
218 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000219 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000220 self.assertEqual(q.get(), current.authkey)
221 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222
223 p.join()
224
Ezio Melotti2623a372010-11-21 13:34:58 +0000225 self.assertEqual(p.exitcode, 0)
226 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000227 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000228
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000229 @classmethod
230 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231 time.sleep(1000)
232
233 def test_terminate(self):
234 if self.TYPE == 'threads':
235 return
236
237 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000238 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000239 p.start()
240
241 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000242 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000243 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
245 p.terminate()
246
247 join = TimingWrapper(p.join)
248 self.assertEqual(join(), None)
249 self.assertTimingAlmostEqual(join.elapsed, 0.0)
250
251 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000252 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000253
254 p.join()
255
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000256 # XXX sometimes get p.exitcode == 0 on Windows ...
257 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 def test_cpu_count(self):
260 try:
261 cpus = multiprocessing.cpu_count()
262 except NotImplementedError:
263 cpus = 1
264 self.assertTrue(type(cpus) is int)
265 self.assertTrue(cpus >= 1)
266
267 def test_active_children(self):
268 self.assertEqual(type(self.active_children()), list)
269
270 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000271 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000272
Jesus Cea6f6016b2011-09-09 20:26:57 +0200273 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000275 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000276
277 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000278 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000279
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000280 @classmethod
281 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000282 from multiprocessing import forking
283 wconn.send(id)
284 if len(id) < 2:
285 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000286 p = cls.Process(
287 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288 )
289 p.start()
290 p.join()
291
292 def test_recursion(self):
293 rconn, wconn = self.Pipe(duplex=False)
294 self._test_recursion(wconn, [])
295
296 time.sleep(DELTA)
297 result = []
298 while rconn.poll():
299 result.append(rconn.recv())
300
301 expected = [
302 [],
303 [0],
304 [0, 0],
305 [0, 1],
306 [1],
307 [1, 0],
308 [1, 1]
309 ]
310 self.assertEqual(result, expected)
311
312#
313#
314#
315
316class _UpperCaser(multiprocessing.Process):
317
318 def __init__(self):
319 multiprocessing.Process.__init__(self)
320 self.child_conn, self.parent_conn = multiprocessing.Pipe()
321
322 def run(self):
323 self.parent_conn.close()
324 for s in iter(self.child_conn.recv, None):
325 self.child_conn.send(s.upper())
326 self.child_conn.close()
327
328 def submit(self, s):
329 assert type(s) is str
330 self.parent_conn.send(s)
331 return self.parent_conn.recv()
332
333 def stop(self):
334 self.parent_conn.send(None)
335 self.parent_conn.close()
336 self.child_conn.close()
337
338class _TestSubclassingProcess(BaseTestCase):
339
340 ALLOWED_TYPES = ('processes',)
341
342 def test_subclassing(self):
343 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200344 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000345 uppercaser.start()
346 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
347 self.assertEqual(uppercaser.submit('world'), 'WORLD')
348 uppercaser.stop()
349 uppercaser.join()
350
351#
352#
353#
354
355def queue_empty(q):
356 if hasattr(q, 'empty'):
357 return q.empty()
358 else:
359 return q.qsize() == 0
360
361def queue_full(q, maxsize):
362 if hasattr(q, 'full'):
363 return q.full()
364 else:
365 return q.qsize() == maxsize
366
367
368class _TestQueue(BaseTestCase):
369
370
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000371 @classmethod
372 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000373 child_can_start.wait()
374 for i in range(6):
375 queue.get()
376 parent_can_continue.set()
377
378 def test_put(self):
379 MAXSIZE = 6
380 queue = self.Queue(maxsize=MAXSIZE)
381 child_can_start = self.Event()
382 parent_can_continue = self.Event()
383
384 proc = self.Process(
385 target=self._test_put,
386 args=(queue, child_can_start, parent_can_continue)
387 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000388 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000389 proc.start()
390
391 self.assertEqual(queue_empty(queue), True)
392 self.assertEqual(queue_full(queue, MAXSIZE), False)
393
394 queue.put(1)
395 queue.put(2, True)
396 queue.put(3, True, None)
397 queue.put(4, False)
398 queue.put(5, False, None)
399 queue.put_nowait(6)
400
401 # the values may be in buffer but not yet in pipe so sleep a bit
402 time.sleep(DELTA)
403
404 self.assertEqual(queue_empty(queue), False)
405 self.assertEqual(queue_full(queue, MAXSIZE), True)
406
407 put = TimingWrapper(queue.put)
408 put_nowait = TimingWrapper(queue.put_nowait)
409
410 self.assertRaises(Queue.Full, put, 7, False)
411 self.assertTimingAlmostEqual(put.elapsed, 0)
412
413 self.assertRaises(Queue.Full, put, 7, False, None)
414 self.assertTimingAlmostEqual(put.elapsed, 0)
415
416 self.assertRaises(Queue.Full, put_nowait, 7)
417 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
418
419 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
420 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
421
422 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
423 self.assertTimingAlmostEqual(put.elapsed, 0)
424
425 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
426 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
427
428 child_can_start.set()
429 parent_can_continue.wait()
430
431 self.assertEqual(queue_empty(queue), True)
432 self.assertEqual(queue_full(queue, MAXSIZE), False)
433
434 proc.join()
435
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000436 @classmethod
437 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000438 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000439 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000440 queue.put(2)
441 queue.put(3)
442 queue.put(4)
443 queue.put(5)
444 parent_can_continue.set()
445
446 def test_get(self):
447 queue = self.Queue()
448 child_can_start = self.Event()
449 parent_can_continue = self.Event()
450
451 proc = self.Process(
452 target=self._test_get,
453 args=(queue, child_can_start, parent_can_continue)
454 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000455 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000456 proc.start()
457
458 self.assertEqual(queue_empty(queue), True)
459
460 child_can_start.set()
461 parent_can_continue.wait()
462
463 time.sleep(DELTA)
464 self.assertEqual(queue_empty(queue), False)
465
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000466 # Hangs unexpectedly, remove for now
467 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000468 self.assertEqual(queue.get(True, None), 2)
469 self.assertEqual(queue.get(True), 3)
470 self.assertEqual(queue.get(timeout=1), 4)
471 self.assertEqual(queue.get_nowait(), 5)
472
473 self.assertEqual(queue_empty(queue), True)
474
475 get = TimingWrapper(queue.get)
476 get_nowait = TimingWrapper(queue.get_nowait)
477
478 self.assertRaises(Queue.Empty, get, False)
479 self.assertTimingAlmostEqual(get.elapsed, 0)
480
481 self.assertRaises(Queue.Empty, get, False, None)
482 self.assertTimingAlmostEqual(get.elapsed, 0)
483
484 self.assertRaises(Queue.Empty, get_nowait)
485 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
486
487 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
488 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
489
490 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
491 self.assertTimingAlmostEqual(get.elapsed, 0)
492
493 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
494 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
495
496 proc.join()
497
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000498 @classmethod
499 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000500 for i in range(10, 20):
501 queue.put(i)
502 # note that at this point the items may only be buffered, so the
503 # process cannot shutdown until the feeder thread has finished
504 # pushing items onto the pipe.
505
506 def test_fork(self):
507 # Old versions of Queue would fail to create a new feeder
508 # thread for a forked process if the original process had its
509 # own feeder thread. This test checks that this no longer
510 # happens.
511
512 queue = self.Queue()
513
514 # put items on queue so that main process starts a feeder thread
515 for i in range(10):
516 queue.put(i)
517
518 # wait to make sure thread starts before we fork a new process
519 time.sleep(DELTA)
520
521 # fork process
522 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200523 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000524 p.start()
525
526 # check that all expected items are in the queue
527 for i in range(20):
528 self.assertEqual(queue.get(), i)
529 self.assertRaises(Queue.Empty, queue.get, False)
530
531 p.join()
532
533 def test_qsize(self):
534 q = self.Queue()
535 try:
536 self.assertEqual(q.qsize(), 0)
537 except NotImplementedError:
538 return
539 q.put(1)
540 self.assertEqual(q.qsize(), 1)
541 q.put(5)
542 self.assertEqual(q.qsize(), 2)
543 q.get()
544 self.assertEqual(q.qsize(), 1)
545 q.get()
546 self.assertEqual(q.qsize(), 0)
547
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000548 @classmethod
549 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000550 for obj in iter(q.get, None):
551 time.sleep(DELTA)
552 q.task_done()
553
554 def test_task_done(self):
555 queue = self.JoinableQueue()
556
557 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000558 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000559
560 workers = [self.Process(target=self._test_task_done, args=(queue,))
561 for i in xrange(4)]
562
563 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200564 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000565 p.start()
566
567 for i in xrange(10):
568 queue.put(i)
569
570 queue.join()
571
572 for p in workers:
573 queue.put(None)
574
575 for p in workers:
576 p.join()
577
578#
579#
580#
581
582class _TestLock(BaseTestCase):
583
584 def test_lock(self):
585 lock = self.Lock()
586 self.assertEqual(lock.acquire(), True)
587 self.assertEqual(lock.acquire(False), False)
588 self.assertEqual(lock.release(), None)
589 self.assertRaises((ValueError, threading.ThreadError), lock.release)
590
591 def test_rlock(self):
592 lock = self.RLock()
593 self.assertEqual(lock.acquire(), True)
594 self.assertEqual(lock.acquire(), True)
595 self.assertEqual(lock.acquire(), True)
596 self.assertEqual(lock.release(), None)
597 self.assertEqual(lock.release(), None)
598 self.assertEqual(lock.release(), None)
599 self.assertRaises((AssertionError, RuntimeError), lock.release)
600
Jesse Noller82eb5902009-03-30 23:29:31 +0000601 def test_lock_context(self):
602 with self.Lock():
603 pass
604
Benjamin Petersondfd79492008-06-13 19:13:39 +0000605
606class _TestSemaphore(BaseTestCase):
607
608 def _test_semaphore(self, sem):
609 self.assertReturnsIfImplemented(2, get_value, sem)
610 self.assertEqual(sem.acquire(), True)
611 self.assertReturnsIfImplemented(1, get_value, sem)
612 self.assertEqual(sem.acquire(), True)
613 self.assertReturnsIfImplemented(0, get_value, sem)
614 self.assertEqual(sem.acquire(False), False)
615 self.assertReturnsIfImplemented(0, get_value, sem)
616 self.assertEqual(sem.release(), None)
617 self.assertReturnsIfImplemented(1, get_value, sem)
618 self.assertEqual(sem.release(), None)
619 self.assertReturnsIfImplemented(2, get_value, sem)
620
621 def test_semaphore(self):
622 sem = self.Semaphore(2)
623 self._test_semaphore(sem)
624 self.assertEqual(sem.release(), None)
625 self.assertReturnsIfImplemented(3, get_value, sem)
626 self.assertEqual(sem.release(), None)
627 self.assertReturnsIfImplemented(4, get_value, sem)
628
629 def test_bounded_semaphore(self):
630 sem = self.BoundedSemaphore(2)
631 self._test_semaphore(sem)
632 # Currently fails on OS/X
633 #if HAVE_GETVALUE:
634 # self.assertRaises(ValueError, sem.release)
635 # self.assertReturnsIfImplemented(2, get_value, sem)
636
637 def test_timeout(self):
638 if self.TYPE != 'processes':
639 return
640
641 sem = self.Semaphore(0)
642 acquire = TimingWrapper(sem.acquire)
643
644 self.assertEqual(acquire(False), False)
645 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
646
647 self.assertEqual(acquire(False, None), False)
648 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
649
650 self.assertEqual(acquire(False, TIMEOUT1), False)
651 self.assertTimingAlmostEqual(acquire.elapsed, 0)
652
653 self.assertEqual(acquire(True, TIMEOUT2), False)
654 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
655
656 self.assertEqual(acquire(timeout=TIMEOUT3), False)
657 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
658
659
660class _TestCondition(BaseTestCase):
661
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000662 @classmethod
663 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000664 cond.acquire()
665 sleeping.release()
666 cond.wait(timeout)
667 woken.release()
668 cond.release()
669
670 def check_invariant(self, cond):
671 # this is only supposed to succeed when there are no sleepers
672 if self.TYPE == 'processes':
673 try:
674 sleepers = (cond._sleeping_count.get_value() -
675 cond._woken_count.get_value())
676 self.assertEqual(sleepers, 0)
677 self.assertEqual(cond._wait_semaphore.get_value(), 0)
678 except NotImplementedError:
679 pass
680
681 def test_notify(self):
682 cond = self.Condition()
683 sleeping = self.Semaphore(0)
684 woken = self.Semaphore(0)
685
686 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000687 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000688 p.start()
689
690 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000691 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000692 p.start()
693
694 # wait for both children to start sleeping
695 sleeping.acquire()
696 sleeping.acquire()
697
698 # check no process/thread has woken up
699 time.sleep(DELTA)
700 self.assertReturnsIfImplemented(0, get_value, woken)
701
702 # wake up one process/thread
703 cond.acquire()
704 cond.notify()
705 cond.release()
706
707 # check one process/thread has woken up
708 time.sleep(DELTA)
709 self.assertReturnsIfImplemented(1, get_value, woken)
710
711 # wake up another
712 cond.acquire()
713 cond.notify()
714 cond.release()
715
716 # check other has woken up
717 time.sleep(DELTA)
718 self.assertReturnsIfImplemented(2, get_value, woken)
719
720 # check state is not mucked up
721 self.check_invariant(cond)
722 p.join()
723
724 def test_notify_all(self):
725 cond = self.Condition()
726 sleeping = self.Semaphore(0)
727 woken = self.Semaphore(0)
728
729 # start some threads/processes which will timeout
730 for i in range(3):
731 p = self.Process(target=self.f,
732 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000733 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000734 p.start()
735
736 t = threading.Thread(target=self.f,
737 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000738 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000739 t.start()
740
741 # wait for them all to sleep
742 for i in xrange(6):
743 sleeping.acquire()
744
745 # check they have all timed out
746 for i in xrange(6):
747 woken.acquire()
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # check state is not mucked up
751 self.check_invariant(cond)
752
753 # start some more threads/processes
754 for i in range(3):
755 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000756 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000757 p.start()
758
759 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000760 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000761 t.start()
762
763 # wait for them to all sleep
764 for i in xrange(6):
765 sleeping.acquire()
766
767 # check no process/thread has woken up
768 time.sleep(DELTA)
769 self.assertReturnsIfImplemented(0, get_value, woken)
770
771 # wake them all up
772 cond.acquire()
773 cond.notify_all()
774 cond.release()
775
776 # check they have all woken
777 time.sleep(DELTA)
778 self.assertReturnsIfImplemented(6, get_value, woken)
779
780 # check state is not mucked up
781 self.check_invariant(cond)
782
783 def test_timeout(self):
784 cond = self.Condition()
785 wait = TimingWrapper(cond.wait)
786 cond.acquire()
787 res = wait(TIMEOUT1)
788 cond.release()
789 self.assertEqual(res, None)
790 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
791
792
793class _TestEvent(BaseTestCase):
794
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000795 @classmethod
796 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000797 time.sleep(TIMEOUT2)
798 event.set()
799
800 def test_event(self):
801 event = self.Event()
802 wait = TimingWrapper(event.wait)
803
Ezio Melottic2077b02011-03-16 12:34:31 +0200804 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000806 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000807
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000808 # Removed, threading.Event.wait() will return the value of the __flag
809 # instead of None. API Shear with the semaphore backed mp.Event
810 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000811 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000812 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000813 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
814
815 event.set()
816
817 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000818 self.assertEqual(event.is_set(), True)
819 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000820 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000821 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000822 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
823 # self.assertEqual(event.is_set(), True)
824
825 event.clear()
826
827 #self.assertEqual(event.is_set(), False)
828
Jesus Cea6f6016b2011-09-09 20:26:57 +0200829 p = self.Process(target=self._test_event, args=(event,))
830 p.daemon = True
831 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000832 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833
834#
835#
836#
837
838class _TestValue(BaseTestCase):
839
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000840 ALLOWED_TYPES = ('processes',)
841
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842 codes_values = [
843 ('i', 4343, 24234),
844 ('d', 3.625, -4.25),
845 ('h', -232, 234),
846 ('c', latin('x'), latin('y'))
847 ]
848
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000849 def setUp(self):
850 if not HAS_SHAREDCTYPES:
851 self.skipTest("requires multiprocessing.sharedctypes")
852
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000853 @classmethod
854 def _test(cls, values):
855 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000856 sv.value = cv[2]
857
858
859 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000860 if raw:
861 values = [self.RawValue(code, value)
862 for code, value, _ in self.codes_values]
863 else:
864 values = [self.Value(code, value)
865 for code, value, _ in self.codes_values]
866
867 for sv, cv in zip(values, self.codes_values):
868 self.assertEqual(sv.value, cv[1])
869
870 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200871 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872 proc.start()
873 proc.join()
874
875 for sv, cv in zip(values, self.codes_values):
876 self.assertEqual(sv.value, cv[2])
877
878 def test_rawvalue(self):
879 self.test_value(raw=True)
880
881 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000882 val1 = self.Value('i', 5)
883 lock1 = val1.get_lock()
884 obj1 = val1.get_obj()
885
886 val2 = self.Value('i', 5, lock=None)
887 lock2 = val2.get_lock()
888 obj2 = val2.get_obj()
889
890 lock = self.Lock()
891 val3 = self.Value('i', 5, lock=lock)
892 lock3 = val3.get_lock()
893 obj3 = val3.get_obj()
894 self.assertEqual(lock, lock3)
895
Jesse Noller6ab22152009-01-18 02:45:38 +0000896 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000897 self.assertFalse(hasattr(arr4, 'get_lock'))
898 self.assertFalse(hasattr(arr4, 'get_obj'))
899
Jesse Noller6ab22152009-01-18 02:45:38 +0000900 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
901
902 arr5 = self.RawValue('i', 5)
903 self.assertFalse(hasattr(arr5, 'get_lock'))
904 self.assertFalse(hasattr(arr5, 'get_obj'))
905
Benjamin Petersondfd79492008-06-13 19:13:39 +0000906
907class _TestArray(BaseTestCase):
908
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000909 ALLOWED_TYPES = ('processes',)
910
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000911 @classmethod
912 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000913 for i in range(1, len(seq)):
914 seq[i] += seq[i-1]
915
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000918 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
919 if raw:
920 arr = self.RawArray('i', seq)
921 else:
922 arr = self.Array('i', seq)
923
924 self.assertEqual(len(arr), len(seq))
925 self.assertEqual(arr[3], seq[3])
926 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
927
928 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
929
930 self.assertEqual(list(arr[:]), seq)
931
932 self.f(seq)
933
934 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200935 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000936 p.start()
937 p.join()
938
939 self.assertEqual(list(arr[:]), seq)
940
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000941 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000942 def test_array_from_size(self):
943 size = 10
944 # Test for zeroing (see issue #11675).
945 # The repetition below strengthens the test by increasing the chances
946 # of previously allocated non-zero memory being used for the new array
947 # on the 2nd and 3rd loops.
948 for _ in range(3):
949 arr = self.Array('i', size)
950 self.assertEqual(len(arr), size)
951 self.assertEqual(list(arr), [0] * size)
952 arr[:] = range(10)
953 self.assertEqual(list(arr), range(10))
954 del arr
955
956 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000957 def test_rawarray(self):
958 self.test_array(raw=True)
959
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000960 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000961 def test_array_accepts_long(self):
962 arr = self.Array('i', 10L)
963 self.assertEqual(len(arr), 10)
964 raw_arr = self.RawArray('i', 10L)
965 self.assertEqual(len(raw_arr), 10)
966
967 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000968 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000969 arr1 = self.Array('i', range(10))
970 lock1 = arr1.get_lock()
971 obj1 = arr1.get_obj()
972
973 arr2 = self.Array('i', range(10), lock=None)
974 lock2 = arr2.get_lock()
975 obj2 = arr2.get_obj()
976
977 lock = self.Lock()
978 arr3 = self.Array('i', range(10), lock=lock)
979 lock3 = arr3.get_lock()
980 obj3 = arr3.get_obj()
981 self.assertEqual(lock, lock3)
982
Jesse Noller6ab22152009-01-18 02:45:38 +0000983 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 self.assertFalse(hasattr(arr4, 'get_lock'))
985 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000986 self.assertRaises(AttributeError,
987 self.Array, 'i', range(10), lock='notalock')
988
989 arr5 = self.RawArray('i', range(10))
990 self.assertFalse(hasattr(arr5, 'get_lock'))
991 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000992
993#
994#
995#
996
997class _TestContainers(BaseTestCase):
998
999 ALLOWED_TYPES = ('manager',)
1000
1001 def test_list(self):
1002 a = self.list(range(10))
1003 self.assertEqual(a[:], range(10))
1004
1005 b = self.list()
1006 self.assertEqual(b[:], [])
1007
1008 b.extend(range(5))
1009 self.assertEqual(b[:], range(5))
1010
1011 self.assertEqual(b[2], 2)
1012 self.assertEqual(b[2:10], [2,3,4])
1013
1014 b *= 2
1015 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1016
1017 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1018
1019 self.assertEqual(a[:], range(10))
1020
1021 d = [a, b]
1022 e = self.list(d)
1023 self.assertEqual(
1024 e[:],
1025 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1026 )
1027
1028 f = self.list([a])
1029 a.append('hello')
1030 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1031
1032 def test_dict(self):
1033 d = self.dict()
1034 indices = range(65, 70)
1035 for i in indices:
1036 d[i] = chr(i)
1037 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1038 self.assertEqual(sorted(d.keys()), indices)
1039 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1040 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1041
1042 def test_namespace(self):
1043 n = self.Namespace()
1044 n.name = 'Bob'
1045 n.job = 'Builder'
1046 n._hidden = 'hidden'
1047 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1048 del n.job
1049 self.assertEqual(str(n), "Namespace(name='Bob')")
1050 self.assertTrue(hasattr(n, 'name'))
1051 self.assertTrue(not hasattr(n, 'job'))
1052
1053#
1054#
1055#
1056
1057def sqr(x, wait=0.0):
1058 time.sleep(wait)
1059 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001060class _TestPool(BaseTestCase):
1061
1062 def test_apply(self):
1063 papply = self.pool.apply
1064 self.assertEqual(papply(sqr, (5,)), sqr(5))
1065 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1066
1067 def test_map(self):
1068 pmap = self.pool.map
1069 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1070 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1071 map(sqr, range(100)))
1072
Jesse Noller7530e472009-07-16 14:23:04 +00001073 def test_map_chunksize(self):
1074 try:
1075 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1076 except multiprocessing.TimeoutError:
1077 self.fail("pool.map_async with chunksize stalled on null list")
1078
Benjamin Petersondfd79492008-06-13 19:13:39 +00001079 def test_async(self):
1080 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1081 get = TimingWrapper(res.get)
1082 self.assertEqual(get(), 49)
1083 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1084
1085 def test_async_timeout(self):
1086 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1087 get = TimingWrapper(res.get)
1088 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1089 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1090
1091 def test_imap(self):
1092 it = self.pool.imap(sqr, range(10))
1093 self.assertEqual(list(it), map(sqr, range(10)))
1094
1095 it = self.pool.imap(sqr, range(10))
1096 for i in range(10):
1097 self.assertEqual(it.next(), i*i)
1098 self.assertRaises(StopIteration, it.next)
1099
1100 it = self.pool.imap(sqr, range(1000), chunksize=100)
1101 for i in range(1000):
1102 self.assertEqual(it.next(), i*i)
1103 self.assertRaises(StopIteration, it.next)
1104
1105 def test_imap_unordered(self):
1106 it = self.pool.imap_unordered(sqr, range(1000))
1107 self.assertEqual(sorted(it), map(sqr, range(1000)))
1108
1109 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1110 self.assertEqual(sorted(it), map(sqr, range(1000)))
1111
1112 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001113 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1114 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1115
Benjamin Petersondfd79492008-06-13 19:13:39 +00001116 p = multiprocessing.Pool(3)
1117 self.assertEqual(3, len(p._pool))
1118 p.close()
1119 p.join()
1120
1121 def test_terminate(self):
1122 if self.TYPE == 'manager':
1123 # On Unix a forked process increfs each shared object to
1124 # which its parent process held a reference. If the
1125 # forked process gets terminated then there is likely to
1126 # be a reference leak. So to prevent
1127 # _TestZZZNumberOfObjects from failing we skip this test
1128 # when using a manager.
1129 return
1130
1131 result = self.pool.map_async(
1132 time.sleep, [0.1 for i in range(10000)], chunksize=1
1133 )
1134 self.pool.terminate()
1135 join = TimingWrapper(self.pool.join)
1136 join()
1137 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001138
1139class _TestPoolWorkerLifetime(BaseTestCase):
1140
1141 ALLOWED_TYPES = ('processes', )
1142 def test_pool_worker_lifetime(self):
1143 p = multiprocessing.Pool(3, maxtasksperchild=10)
1144 self.assertEqual(3, len(p._pool))
1145 origworkerpids = [w.pid for w in p._pool]
1146 # Run many tasks so each worker gets replaced (hopefully)
1147 results = []
1148 for i in range(100):
1149 results.append(p.apply_async(sqr, (i, )))
1150 # Fetch the results and verify we got the right answers,
1151 # also ensuring all the tasks have completed.
1152 for (j, res) in enumerate(results):
1153 self.assertEqual(res.get(), sqr(j))
1154 # Refill the pool
1155 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001156 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001157 # (countdown * DELTA = 5 seconds max startup process time)
1158 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001159 while countdown and not all(w.is_alive() for w in p._pool):
1160 countdown -= 1
1161 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001162 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001163 # All pids should be assigned. See issue #7805.
1164 self.assertNotIn(None, origworkerpids)
1165 self.assertNotIn(None, finalworkerpids)
1166 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001167 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1168 p.close()
1169 p.join()
1170
Charles-François Natali46f990e2011-10-24 18:43:51 +02001171 def test_pool_worker_lifetime_early_close(self):
1172 # Issue #10332: closing a pool whose workers have limited lifetimes
1173 # before all the tasks completed would make join() hang.
1174 p = multiprocessing.Pool(3, maxtasksperchild=1)
1175 results = []
1176 for i in range(6):
1177 results.append(p.apply_async(sqr, (i, 0.3)))
1178 p.close()
1179 p.join()
1180 # check the results
1181 for (j, res) in enumerate(results):
1182 self.assertEqual(res.get(), sqr(j))
1183
1184
Benjamin Petersondfd79492008-06-13 19:13:39 +00001185#
1186# Test that manager has expected number of shared objects left
1187#
1188
1189class _TestZZZNumberOfObjects(BaseTestCase):
1190 # Because test cases are sorted alphabetically, this one will get
1191 # run after all the other tests for the manager. It tests that
1192 # there have been no "reference leaks" for the manager's shared
1193 # objects. Note the comment in _TestPool.test_terminate().
1194 ALLOWED_TYPES = ('manager',)
1195
1196 def test_number_of_objects(self):
1197 EXPECTED_NUMBER = 1 # the pool object is still alive
1198 multiprocessing.active_children() # discard dead process objs
1199 gc.collect() # do garbage collection
1200 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001201 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001202 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001203 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001204 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001205
1206 self.assertEqual(refs, EXPECTED_NUMBER)
1207
1208#
1209# Test of creating a customized manager class
1210#
1211
1212from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1213
1214class FooBar(object):
1215 def f(self):
1216 return 'f()'
1217 def g(self):
1218 raise ValueError
1219 def _h(self):
1220 return '_h()'
1221
1222def baz():
1223 for i in xrange(10):
1224 yield i*i
1225
1226class IteratorProxy(BaseProxy):
1227 _exposed_ = ('next', '__next__')
1228 def __iter__(self):
1229 return self
1230 def next(self):
1231 return self._callmethod('next')
1232 def __next__(self):
1233 return self._callmethod('__next__')
1234
1235class MyManager(BaseManager):
1236 pass
1237
1238MyManager.register('Foo', callable=FooBar)
1239MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1240MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1241
1242
1243class _TestMyManager(BaseTestCase):
1244
1245 ALLOWED_TYPES = ('manager',)
1246
1247 def test_mymanager(self):
1248 manager = MyManager()
1249 manager.start()
1250
1251 foo = manager.Foo()
1252 bar = manager.Bar()
1253 baz = manager.baz()
1254
1255 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1256 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1257
1258 self.assertEqual(foo_methods, ['f', 'g'])
1259 self.assertEqual(bar_methods, ['f', '_h'])
1260
1261 self.assertEqual(foo.f(), 'f()')
1262 self.assertRaises(ValueError, foo.g)
1263 self.assertEqual(foo._callmethod('f'), 'f()')
1264 self.assertRaises(RemoteError, foo._callmethod, '_h')
1265
1266 self.assertEqual(bar.f(), 'f()')
1267 self.assertEqual(bar._h(), '_h()')
1268 self.assertEqual(bar._callmethod('f'), 'f()')
1269 self.assertEqual(bar._callmethod('_h'), '_h()')
1270
1271 self.assertEqual(list(baz), [i*i for i in range(10)])
1272
1273 manager.shutdown()
1274
1275#
1276# Test of connecting to a remote server and using xmlrpclib for serialization
1277#
1278
1279_queue = Queue.Queue()
1280def get_queue():
1281 return _queue
1282
1283class QueueManager(BaseManager):
1284 '''manager class used by server process'''
1285QueueManager.register('get_queue', callable=get_queue)
1286
1287class QueueManager2(BaseManager):
1288 '''manager class which specifies the same interface as QueueManager'''
1289QueueManager2.register('get_queue')
1290
1291
1292SERIALIZER = 'xmlrpclib'
1293
1294class _TestRemoteManager(BaseTestCase):
1295
1296 ALLOWED_TYPES = ('manager',)
1297
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001298 @classmethod
1299 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001300 manager = QueueManager2(
1301 address=address, authkey=authkey, serializer=SERIALIZER
1302 )
1303 manager.connect()
1304 queue = manager.get_queue()
1305 queue.put(('hello world', None, True, 2.25))
1306
1307 def test_remote(self):
1308 authkey = os.urandom(32)
1309
1310 manager = QueueManager(
1311 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1312 )
1313 manager.start()
1314
1315 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001316 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001317 p.start()
1318
1319 manager2 = QueueManager2(
1320 address=manager.address, authkey=authkey, serializer=SERIALIZER
1321 )
1322 manager2.connect()
1323 queue = manager2.get_queue()
1324
1325 # Note that xmlrpclib will deserialize object as a list not a tuple
1326 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1327
1328 # Because we are using xmlrpclib for serialization instead of
1329 # pickle this will cause a serialization error.
1330 self.assertRaises(Exception, queue.put, time.sleep)
1331
1332 # Make queue finalizer run before the server is stopped
1333 del queue
1334 manager.shutdown()
1335
Jesse Noller459a6482009-03-30 15:50:42 +00001336class _TestManagerRestart(BaseTestCase):
1337
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001338 @classmethod
1339 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001340 manager = QueueManager(
1341 address=address, authkey=authkey, serializer=SERIALIZER)
1342 manager.connect()
1343 queue = manager.get_queue()
1344 queue.put('hello world')
1345
1346 def test_rapid_restart(self):
1347 authkey = os.urandom(32)
1348 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001349 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001350 srvr = manager.get_server()
1351 addr = srvr.address
1352 # Close the connection.Listener socket which gets opened as a part
1353 # of manager.get_server(). It's not needed for the test.
1354 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001355 manager.start()
1356
1357 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001358 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001359 p.start()
1360 queue = manager.get_queue()
1361 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001362 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001363 manager.shutdown()
1364 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001365 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001366 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001367 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001368
Benjamin Petersondfd79492008-06-13 19:13:39 +00001369#
1370#
1371#
1372
1373SENTINEL = latin('')
1374
1375class _TestConnection(BaseTestCase):
1376
1377 ALLOWED_TYPES = ('processes', 'threads')
1378
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001379 @classmethod
1380 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001381 for msg in iter(conn.recv_bytes, SENTINEL):
1382 conn.send_bytes(msg)
1383 conn.close()
1384
1385 def test_connection(self):
1386 conn, child_conn = self.Pipe()
1387
1388 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001389 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001390 p.start()
1391
1392 seq = [1, 2.25, None]
1393 msg = latin('hello world')
1394 longmsg = msg * 10
1395 arr = array.array('i', range(4))
1396
1397 if self.TYPE == 'processes':
1398 self.assertEqual(type(conn.fileno()), int)
1399
1400 self.assertEqual(conn.send(seq), None)
1401 self.assertEqual(conn.recv(), seq)
1402
1403 self.assertEqual(conn.send_bytes(msg), None)
1404 self.assertEqual(conn.recv_bytes(), msg)
1405
1406 if self.TYPE == 'processes':
1407 buffer = array.array('i', [0]*10)
1408 expected = list(arr) + [0] * (10 - len(arr))
1409 self.assertEqual(conn.send_bytes(arr), None)
1410 self.assertEqual(conn.recv_bytes_into(buffer),
1411 len(arr) * buffer.itemsize)
1412 self.assertEqual(list(buffer), expected)
1413
1414 buffer = array.array('i', [0]*10)
1415 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1416 self.assertEqual(conn.send_bytes(arr), None)
1417 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1418 len(arr) * buffer.itemsize)
1419 self.assertEqual(list(buffer), expected)
1420
1421 buffer = bytearray(latin(' ' * 40))
1422 self.assertEqual(conn.send_bytes(longmsg), None)
1423 try:
1424 res = conn.recv_bytes_into(buffer)
1425 except multiprocessing.BufferTooShort, e:
1426 self.assertEqual(e.args, (longmsg,))
1427 else:
1428 self.fail('expected BufferTooShort, got %s' % res)
1429
1430 poll = TimingWrapper(conn.poll)
1431
1432 self.assertEqual(poll(), False)
1433 self.assertTimingAlmostEqual(poll.elapsed, 0)
1434
1435 self.assertEqual(poll(TIMEOUT1), False)
1436 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1437
1438 conn.send(None)
1439
1440 self.assertEqual(poll(TIMEOUT1), True)
1441 self.assertTimingAlmostEqual(poll.elapsed, 0)
1442
1443 self.assertEqual(conn.recv(), None)
1444
1445 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1446 conn.send_bytes(really_big_msg)
1447 self.assertEqual(conn.recv_bytes(), really_big_msg)
1448
1449 conn.send_bytes(SENTINEL) # tell child to quit
1450 child_conn.close()
1451
1452 if self.TYPE == 'processes':
1453 self.assertEqual(conn.readable, True)
1454 self.assertEqual(conn.writable, True)
1455 self.assertRaises(EOFError, conn.recv)
1456 self.assertRaises(EOFError, conn.recv_bytes)
1457
1458 p.join()
1459
1460 def test_duplex_false(self):
1461 reader, writer = self.Pipe(duplex=False)
1462 self.assertEqual(writer.send(1), None)
1463 self.assertEqual(reader.recv(), 1)
1464 if self.TYPE == 'processes':
1465 self.assertEqual(reader.readable, True)
1466 self.assertEqual(reader.writable, False)
1467 self.assertEqual(writer.readable, False)
1468 self.assertEqual(writer.writable, True)
1469 self.assertRaises(IOError, reader.send, 2)
1470 self.assertRaises(IOError, writer.recv)
1471 self.assertRaises(IOError, writer.poll)
1472
1473 def test_spawn_close(self):
1474 # We test that a pipe connection can be closed by parent
1475 # process immediately after child is spawned. On Windows this
1476 # would have sometimes failed on old versions because
1477 # child_conn would be closed before the child got a chance to
1478 # duplicate it.
1479 conn, child_conn = self.Pipe()
1480
1481 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001482 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001483 p.start()
1484 child_conn.close() # this might complete before child initializes
1485
1486 msg = latin('hello')
1487 conn.send_bytes(msg)
1488 self.assertEqual(conn.recv_bytes(), msg)
1489
1490 conn.send_bytes(SENTINEL)
1491 conn.close()
1492 p.join()
1493
1494 def test_sendbytes(self):
1495 if self.TYPE != 'processes':
1496 return
1497
1498 msg = latin('abcdefghijklmnopqrstuvwxyz')
1499 a, b = self.Pipe()
1500
1501 a.send_bytes(msg)
1502 self.assertEqual(b.recv_bytes(), msg)
1503
1504 a.send_bytes(msg, 5)
1505 self.assertEqual(b.recv_bytes(), msg[5:])
1506
1507 a.send_bytes(msg, 7, 8)
1508 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1509
1510 a.send_bytes(msg, 26)
1511 self.assertEqual(b.recv_bytes(), latin(''))
1512
1513 a.send_bytes(msg, 26, 0)
1514 self.assertEqual(b.recv_bytes(), latin(''))
1515
1516 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1517
1518 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1519
1520 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1521
1522 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1523
1524 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1525
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001526 @classmethod
1527 def _is_fd_assigned(cls, fd):
1528 try:
1529 os.fstat(fd)
1530 except OSError as e:
1531 if e.errno == errno.EBADF:
1532 return False
1533 raise
1534 else:
1535 return True
1536
1537 @classmethod
1538 def _writefd(cls, conn, data, create_dummy_fds=False):
1539 if create_dummy_fds:
1540 for i in range(0, 256):
1541 if not cls._is_fd_assigned(i):
1542 os.dup2(conn.fileno(), i)
1543 fd = reduction.recv_handle(conn)
1544 if msvcrt:
1545 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1546 os.write(fd, data)
1547 os.close(fd)
1548
Charles-François Natalif8413b22011-09-21 18:44:49 +02001549 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001550 def test_fd_transfer(self):
1551 if self.TYPE != 'processes':
1552 self.skipTest("only makes sense with processes")
1553 conn, child_conn = self.Pipe(duplex=True)
1554
1555 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001556 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001557 p.start()
1558 with open(test_support.TESTFN, "wb") as f:
1559 fd = f.fileno()
1560 if msvcrt:
1561 fd = msvcrt.get_osfhandle(fd)
1562 reduction.send_handle(conn, fd, p.pid)
1563 p.join()
1564 with open(test_support.TESTFN, "rb") as f:
1565 self.assertEqual(f.read(), b"foo")
1566
Charles-François Natalif8413b22011-09-21 18:44:49 +02001567 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001568 @unittest.skipIf(sys.platform == "win32",
1569 "test semantics don't make sense on Windows")
1570 @unittest.skipIf(MAXFD <= 256,
1571 "largest assignable fd number is too small")
1572 @unittest.skipUnless(hasattr(os, "dup2"),
1573 "test needs os.dup2()")
1574 def test_large_fd_transfer(self):
1575 # With fd > 256 (issue #11657)
1576 if self.TYPE != 'processes':
1577 self.skipTest("only makes sense with processes")
1578 conn, child_conn = self.Pipe(duplex=True)
1579
1580 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001581 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001582 p.start()
1583 with open(test_support.TESTFN, "wb") as f:
1584 fd = f.fileno()
1585 for newfd in range(256, MAXFD):
1586 if not self._is_fd_assigned(newfd):
1587 break
1588 else:
1589 self.fail("could not find an unassigned large file descriptor")
1590 os.dup2(fd, newfd)
1591 try:
1592 reduction.send_handle(conn, newfd, p.pid)
1593 finally:
1594 os.close(newfd)
1595 p.join()
1596 with open(test_support.TESTFN, "rb") as f:
1597 self.assertEqual(f.read(), b"bar")
1598
Jesus Ceac23484b2011-09-21 03:47:39 +02001599 @classmethod
1600 def _send_data_without_fd(self, conn):
1601 os.write(conn.fileno(), b"\0")
1602
Charles-François Natalif8413b22011-09-21 18:44:49 +02001603 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001604 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1605 def test_missing_fd_transfer(self):
1606 # Check that exception is raised when received data is not
1607 # accompanied by a file descriptor in ancillary data.
1608 if self.TYPE != 'processes':
1609 self.skipTest("only makes sense with processes")
1610 conn, child_conn = self.Pipe(duplex=True)
1611
1612 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1613 p.daemon = True
1614 p.start()
1615 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1616 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001617
Benjamin Petersondfd79492008-06-13 19:13:39 +00001618class _TestListenerClient(BaseTestCase):
1619
1620 ALLOWED_TYPES = ('processes', 'threads')
1621
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001622 @classmethod
1623 def _test(cls, address):
1624 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001625 conn.send('hello')
1626 conn.close()
1627
1628 def test_listener_client(self):
1629 for family in self.connection.families:
1630 l = self.connection.Listener(family=family)
1631 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001632 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001633 p.start()
1634 conn = l.accept()
1635 self.assertEqual(conn.recv(), 'hello')
1636 p.join()
1637 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001638#
1639# Test of sending connection and socket objects between processes
1640#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001641"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001642class _TestPicklingConnections(BaseTestCase):
1643
1644 ALLOWED_TYPES = ('processes',)
1645
1646 def _listener(self, conn, families):
1647 for fam in families:
1648 l = self.connection.Listener(family=fam)
1649 conn.send(l.address)
1650 new_conn = l.accept()
1651 conn.send(new_conn)
1652
1653 if self.TYPE == 'processes':
1654 l = socket.socket()
1655 l.bind(('localhost', 0))
1656 conn.send(l.getsockname())
1657 l.listen(1)
1658 new_conn, addr = l.accept()
1659 conn.send(new_conn)
1660
1661 conn.recv()
1662
1663 def _remote(self, conn):
1664 for (address, msg) in iter(conn.recv, None):
1665 client = self.connection.Client(address)
1666 client.send(msg.upper())
1667 client.close()
1668
1669 if self.TYPE == 'processes':
1670 address, msg = conn.recv()
1671 client = socket.socket()
1672 client.connect(address)
1673 client.sendall(msg.upper())
1674 client.close()
1675
1676 conn.close()
1677
1678 def test_pickling(self):
1679 try:
1680 multiprocessing.allow_connection_pickling()
1681 except ImportError:
1682 return
1683
1684 families = self.connection.families
1685
1686 lconn, lconn0 = self.Pipe()
1687 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001688 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001689 lp.start()
1690 lconn0.close()
1691
1692 rconn, rconn0 = self.Pipe()
1693 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001694 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001695 rp.start()
1696 rconn0.close()
1697
1698 for fam in families:
1699 msg = ('This connection uses family %s' % fam).encode('ascii')
1700 address = lconn.recv()
1701 rconn.send((address, msg))
1702 new_conn = lconn.recv()
1703 self.assertEqual(new_conn.recv(), msg.upper())
1704
1705 rconn.send(None)
1706
1707 if self.TYPE == 'processes':
1708 msg = latin('This connection uses a normal socket')
1709 address = lconn.recv()
1710 rconn.send((address, msg))
1711 if hasattr(socket, 'fromfd'):
1712 new_conn = lconn.recv()
1713 self.assertEqual(new_conn.recv(100), msg.upper())
1714 else:
1715 # XXX On Windows with Py2.6 need to backport fromfd()
1716 discard = lconn.recv_bytes()
1717
1718 lconn.send(None)
1719
1720 rconn.close()
1721 lconn.close()
1722
1723 lp.join()
1724 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001725"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001726#
1727#
1728#
1729
1730class _TestHeap(BaseTestCase):
1731
1732 ALLOWED_TYPES = ('processes',)
1733
1734 def test_heap(self):
1735 iterations = 5000
1736 maxblocks = 50
1737 blocks = []
1738
1739 # create and destroy lots of blocks of different sizes
1740 for i in xrange(iterations):
1741 size = int(random.lognormvariate(0, 1) * 1000)
1742 b = multiprocessing.heap.BufferWrapper(size)
1743 blocks.append(b)
1744 if len(blocks) > maxblocks:
1745 i = random.randrange(maxblocks)
1746 del blocks[i]
1747
1748 # get the heap object
1749 heap = multiprocessing.heap.BufferWrapper._heap
1750
1751 # verify the state of the heap
1752 all = []
1753 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001754 heap._lock.acquire()
1755 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001756 for L in heap._len_to_seq.values():
1757 for arena, start, stop in L:
1758 all.append((heap._arenas.index(arena), start, stop,
1759 stop-start, 'free'))
1760 for arena, start, stop in heap._allocated_blocks:
1761 all.append((heap._arenas.index(arena), start, stop,
1762 stop-start, 'occupied'))
1763 occupied += (stop-start)
1764
1765 all.sort()
1766
1767 for i in range(len(all)-1):
1768 (arena, start, stop) = all[i][:3]
1769 (narena, nstart, nstop) = all[i+1][:3]
1770 self.assertTrue((arena != narena and nstart == 0) or
1771 (stop == nstart))
1772
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001773 def test_free_from_gc(self):
1774 # Check that freeing of blocks by the garbage collector doesn't deadlock
1775 # (issue #12352).
1776 # Make sure the GC is enabled, and set lower collection thresholds to
1777 # make collections more frequent (and increase the probability of
1778 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001779 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001780 gc.enable()
1781 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001782 thresholds = gc.get_threshold()
1783 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001784 gc.set_threshold(10)
1785
1786 # perform numerous block allocations, with cyclic references to make
1787 # sure objects are collected asynchronously by the gc
1788 for i in range(5000):
1789 a = multiprocessing.heap.BufferWrapper(1)
1790 b = multiprocessing.heap.BufferWrapper(1)
1791 # circular references
1792 a.buddy = b
1793 b.buddy = a
1794
Benjamin Petersondfd79492008-06-13 19:13:39 +00001795#
1796#
1797#
1798
Benjamin Petersondfd79492008-06-13 19:13:39 +00001799class _Foo(Structure):
1800 _fields_ = [
1801 ('x', c_int),
1802 ('y', c_double)
1803 ]
1804
1805class _TestSharedCTypes(BaseTestCase):
1806
1807 ALLOWED_TYPES = ('processes',)
1808
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001809 def setUp(self):
1810 if not HAS_SHAREDCTYPES:
1811 self.skipTest("requires multiprocessing.sharedctypes")
1812
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001813 @classmethod
1814 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001815 x.value *= 2
1816 y.value *= 2
1817 foo.x *= 2
1818 foo.y *= 2
1819 string.value *= 2
1820 for i in range(len(arr)):
1821 arr[i] *= 2
1822
1823 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001824 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001825 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001826 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001827 arr = self.Array('d', range(10), lock=lock)
1828 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001829 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001830
1831 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001832 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001833 p.start()
1834 p.join()
1835
1836 self.assertEqual(x.value, 14)
1837 self.assertAlmostEqual(y.value, 2.0/3.0)
1838 self.assertEqual(foo.x, 6)
1839 self.assertAlmostEqual(foo.y, 4.0)
1840 for i in range(10):
1841 self.assertAlmostEqual(arr[i], i*2)
1842 self.assertEqual(string.value, latin('hellohello'))
1843
1844 def test_synchronize(self):
1845 self.test_sharedctypes(lock=True)
1846
1847 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001848 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001849 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001850 foo.x = 0
1851 foo.y = 0
1852 self.assertEqual(bar.x, 2)
1853 self.assertAlmostEqual(bar.y, 5.0)
1854
1855#
1856#
1857#
1858
1859class _TestFinalize(BaseTestCase):
1860
1861 ALLOWED_TYPES = ('processes',)
1862
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001863 @classmethod
1864 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001865 class Foo(object):
1866 pass
1867
1868 a = Foo()
1869 util.Finalize(a, conn.send, args=('a',))
1870 del a # triggers callback for a
1871
1872 b = Foo()
1873 close_b = util.Finalize(b, conn.send, args=('b',))
1874 close_b() # triggers callback for b
1875 close_b() # does nothing because callback has already been called
1876 del b # does nothing because callback has already been called
1877
1878 c = Foo()
1879 util.Finalize(c, conn.send, args=('c',))
1880
1881 d10 = Foo()
1882 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1883
1884 d01 = Foo()
1885 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1886 d02 = Foo()
1887 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1888 d03 = Foo()
1889 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1890
1891 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1892
1893 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1894
Ezio Melottic2077b02011-03-16 12:34:31 +02001895 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001896 # garbage collecting locals
1897 util._exit_function()
1898 conn.close()
1899 os._exit(0)
1900
1901 def test_finalize(self):
1902 conn, child_conn = self.Pipe()
1903
1904 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001905 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001906 p.start()
1907 p.join()
1908
1909 result = [obj for obj in iter(conn.recv, 'STOP')]
1910 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1911
1912#
1913# Test that from ... import * works for each module
1914#
1915
1916class _TestImportStar(BaseTestCase):
1917
1918 ALLOWED_TYPES = ('processes',)
1919
1920 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001921 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001922 'multiprocessing', 'multiprocessing.connection',
1923 'multiprocessing.heap', 'multiprocessing.managers',
1924 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001925 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001926 ]
1927
Charles-François Natalif8413b22011-09-21 18:44:49 +02001928 if HAS_REDUCTION:
1929 modules.append('multiprocessing.reduction')
1930
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001931 if c_int is not None:
1932 # This module requires _ctypes
1933 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001934
1935 for name in modules:
1936 __import__(name)
1937 mod = sys.modules[name]
1938
1939 for attr in getattr(mod, '__all__', ()):
1940 self.assertTrue(
1941 hasattr(mod, attr),
1942 '%r does not have attribute %r' % (mod, attr)
1943 )
1944
1945#
1946# Quick test that logging works -- does not test logging output
1947#
1948
1949class _TestLogging(BaseTestCase):
1950
1951 ALLOWED_TYPES = ('processes',)
1952
1953 def test_enable_logging(self):
1954 logger = multiprocessing.get_logger()
1955 logger.setLevel(util.SUBWARNING)
1956 self.assertTrue(logger is not None)
1957 logger.debug('this will not be printed')
1958 logger.info('nor will this')
1959 logger.setLevel(LOG_LEVEL)
1960
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001961 @classmethod
1962 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001963 logger = multiprocessing.get_logger()
1964 conn.send(logger.getEffectiveLevel())
1965
1966 def test_level(self):
1967 LEVEL1 = 32
1968 LEVEL2 = 37
1969
1970 logger = multiprocessing.get_logger()
1971 root_logger = logging.getLogger()
1972 root_level = root_logger.level
1973
1974 reader, writer = multiprocessing.Pipe(duplex=False)
1975
1976 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001977 p = self.Process(target=self._test_level, args=(writer,))
1978 p.daemon = True
1979 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001980 self.assertEqual(LEVEL1, reader.recv())
1981
1982 logger.setLevel(logging.NOTSET)
1983 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001984 p = self.Process(target=self._test_level, args=(writer,))
1985 p.daemon = True
1986 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001987 self.assertEqual(LEVEL2, reader.recv())
1988
1989 root_logger.setLevel(root_level)
1990 logger.setLevel(level=LOG_LEVEL)
1991
Jesse Noller814d02d2009-11-21 14:38:23 +00001992
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001993# class _TestLoggingProcessName(BaseTestCase):
1994#
1995# def handle(self, record):
1996# assert record.processName == multiprocessing.current_process().name
1997# self.__handled = True
1998#
1999# def test_logging(self):
2000# handler = logging.Handler()
2001# handler.handle = self.handle
2002# self.__handled = False
2003# # Bypass getLogger() and side-effects
2004# logger = logging.getLoggerClass()(
2005# 'multiprocessing.test.TestLoggingProcessName')
2006# logger.addHandler(handler)
2007# logger.propagate = False
2008#
2009# logger.warn('foo')
2010# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002011
Benjamin Petersondfd79492008-06-13 19:13:39 +00002012#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002013# Test to verify handle verification, see issue 3321
2014#
2015
2016class TestInvalidHandle(unittest.TestCase):
2017
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002018 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002019 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002020 conn = _multiprocessing.Connection(44977608)
2021 self.assertRaises(IOError, conn.poll)
2022 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002023
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002024#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002025# Functions used to create test cases from the base ones in this module
2026#
2027
2028def get_attributes(Source, names):
2029 d = {}
2030 for name in names:
2031 obj = getattr(Source, name)
2032 if type(obj) == type(get_attributes):
2033 obj = staticmethod(obj)
2034 d[name] = obj
2035 return d
2036
2037def create_test_cases(Mixin, type):
2038 result = {}
2039 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002040 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002041
2042 for name in glob.keys():
2043 if name.startswith('_Test'):
2044 base = glob[name]
2045 if type in base.ALLOWED_TYPES:
2046 newname = 'With' + Type + name[1:]
2047 class Temp(base, unittest.TestCase, Mixin):
2048 pass
2049 result[newname] = Temp
2050 Temp.__name__ = newname
2051 Temp.__module__ = Mixin.__module__
2052 return result
2053
2054#
2055# Create test cases
2056#
2057
2058class ProcessesMixin(object):
2059 TYPE = 'processes'
2060 Process = multiprocessing.Process
2061 locals().update(get_attributes(multiprocessing, (
2062 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2063 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2064 'RawArray', 'current_process', 'active_children', 'Pipe',
2065 'connection', 'JoinableQueue'
2066 )))
2067
2068testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2069globals().update(testcases_processes)
2070
2071
2072class ManagerMixin(object):
2073 TYPE = 'manager'
2074 Process = multiprocessing.Process
2075 manager = object.__new__(multiprocessing.managers.SyncManager)
2076 locals().update(get_attributes(manager, (
2077 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2078 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2079 'Namespace', 'JoinableQueue'
2080 )))
2081
2082testcases_manager = create_test_cases(ManagerMixin, type='manager')
2083globals().update(testcases_manager)
2084
2085
2086class ThreadsMixin(object):
2087 TYPE = 'threads'
2088 Process = multiprocessing.dummy.Process
2089 locals().update(get_attributes(multiprocessing.dummy, (
2090 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2091 'Condition', 'Event', 'Value', 'Array', 'current_process',
2092 'active_children', 'Pipe', 'connection', 'dict', 'list',
2093 'Namespace', 'JoinableQueue'
2094 )))
2095
2096testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2097globals().update(testcases_threads)
2098
Neal Norwitz0c519b32008-08-25 01:50:24 +00002099class OtherTest(unittest.TestCase):
2100 # TODO: add more tests for deliver/answer challenge.
2101 def test_deliver_challenge_auth_failure(self):
2102 class _FakeConnection(object):
2103 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002104 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002105 def send_bytes(self, data):
2106 pass
2107 self.assertRaises(multiprocessing.AuthenticationError,
2108 multiprocessing.connection.deliver_challenge,
2109 _FakeConnection(), b'abc')
2110
2111 def test_answer_challenge_auth_failure(self):
2112 class _FakeConnection(object):
2113 def __init__(self):
2114 self.count = 0
2115 def recv_bytes(self, size):
2116 self.count += 1
2117 if self.count == 1:
2118 return multiprocessing.connection.CHALLENGE
2119 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002120 return b'something bogus'
2121 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002122 def send_bytes(self, data):
2123 pass
2124 self.assertRaises(multiprocessing.AuthenticationError,
2125 multiprocessing.connection.answer_challenge,
2126 _FakeConnection(), b'abc')
2127
Jesse Noller7152f6d2009-04-02 05:17:26 +00002128#
2129# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2130#
2131
2132def initializer(ns):
2133 ns.test += 1
2134
2135class TestInitializers(unittest.TestCase):
2136 def setUp(self):
2137 self.mgr = multiprocessing.Manager()
2138 self.ns = self.mgr.Namespace()
2139 self.ns.test = 0
2140
2141 def tearDown(self):
2142 self.mgr.shutdown()
2143
2144 def test_manager_initializer(self):
2145 m = multiprocessing.managers.SyncManager()
2146 self.assertRaises(TypeError, m.start, 1)
2147 m.start(initializer, (self.ns,))
2148 self.assertEqual(self.ns.test, 1)
2149 m.shutdown()
2150
2151 def test_pool_initializer(self):
2152 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2153 p = multiprocessing.Pool(1, initializer, (self.ns,))
2154 p.close()
2155 p.join()
2156 self.assertEqual(self.ns.test, 1)
2157
Jesse Noller1b90efb2009-06-30 17:11:52 +00002158#
2159# Issue 5155, 5313, 5331: Test process in processes
2160# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2161#
2162
2163def _ThisSubProcess(q):
2164 try:
2165 item = q.get(block=False)
2166 except Queue.Empty:
2167 pass
2168
2169def _TestProcess(q):
2170 queue = multiprocessing.Queue()
2171 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002172 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002173 subProc.start()
2174 subProc.join()
2175
2176def _afunc(x):
2177 return x*x
2178
2179def pool_in_process():
2180 pool = multiprocessing.Pool(processes=4)
2181 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2182
2183class _file_like(object):
2184 def __init__(self, delegate):
2185 self._delegate = delegate
2186 self._pid = None
2187
2188 @property
2189 def cache(self):
2190 pid = os.getpid()
2191 # There are no race conditions since fork keeps only the running thread
2192 if pid != self._pid:
2193 self._pid = pid
2194 self._cache = []
2195 return self._cache
2196
2197 def write(self, data):
2198 self.cache.append(data)
2199
2200 def flush(self):
2201 self._delegate.write(''.join(self.cache))
2202 self._cache = []
2203
2204class TestStdinBadfiledescriptor(unittest.TestCase):
2205
2206 def test_queue_in_process(self):
2207 queue = multiprocessing.Queue()
2208 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2209 proc.start()
2210 proc.join()
2211
2212 def test_pool_in_process(self):
2213 p = multiprocessing.Process(target=pool_in_process)
2214 p.start()
2215 p.join()
2216
2217 def test_flushing(self):
2218 sio = StringIO()
2219 flike = _file_like(sio)
2220 flike.write('foo')
2221 proc = multiprocessing.Process(target=lambda: flike.flush())
2222 flike.flush()
2223 assert sio.getvalue() == 'foo'
2224
2225testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2226 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002227
Benjamin Petersondfd79492008-06-13 19:13:39 +00002228#
2229#
2230#
2231
2232def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002233 if sys.platform.startswith("linux"):
2234 try:
2235 lock = multiprocessing.RLock()
2236 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002237 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002238
Benjamin Petersondfd79492008-06-13 19:13:39 +00002239 if run is None:
2240 from test.test_support import run_unittest as run
2241
2242 util.get_temp_dir() # creates temp directory for use by all processes
2243
2244 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2245
Jesse Noller146b7ab2008-07-02 16:44:09 +00002246 ProcessesMixin.pool = multiprocessing.Pool(4)
2247 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2248 ManagerMixin.manager.__init__()
2249 ManagerMixin.manager.start()
2250 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002251
2252 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002253 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2254 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002255 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2256 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002257 )
2258
2259 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2260 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002261 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2262 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002263 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002264 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002265 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002266 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2267 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2268 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002269 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002270
Jesse Noller146b7ab2008-07-02 16:44:09 +00002271 ThreadsMixin.pool.terminate()
2272 ProcessesMixin.pool.terminate()
2273 ManagerMixin.pool.terminate()
2274 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002275
Jesse Noller146b7ab2008-07-02 16:44:09 +00002276 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002277
2278def main():
2279 test_main(unittest.TextTestRunner(verbosity=2).run)
2280
2281if __name__ == '__main__':
2282 main()