blob: 201fa98e4abacb349010ae86ebe976c6525a3906 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
98#
Benjamin Petersondfd79492008-06-13 19:13:39 +000099# Creates a wrapper for a function which records the time it takes to finish
100#
101
102class TimingWrapper(object):
103
104 def __init__(self, func):
105 self.func = func
106 self.elapsed = None
107
108 def __call__(self, *args, **kwds):
109 t = time.time()
110 try:
111 return self.func(*args, **kwds)
112 finally:
113 self.elapsed = time.time() - t
114
115#
116# Base class for test cases
117#
118
119class BaseTestCase(object):
120
121 ALLOWED_TYPES = ('processes', 'manager', 'threads')
122
123 def assertTimingAlmostEqual(self, a, b):
124 if CHECK_TIMINGS:
125 self.assertAlmostEqual(a, b, 1)
126
127 def assertReturnsIfImplemented(self, value, func, *args):
128 try:
129 res = func(*args)
130 except NotImplementedError:
131 pass
132 else:
133 return self.assertEqual(value, res)
134
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000135 # For the sanity of Windows users, rather than crashing or freezing in
136 # multiple ways.
137 def __reduce__(self, *args):
138 raise NotImplementedError("shouldn't try to pickle a test case")
139
140 __reduce_ex__ = __reduce__
141
Benjamin Petersondfd79492008-06-13 19:13:39 +0000142#
143# Return the value of a semaphore
144#
145
146def get_value(self):
147 try:
148 return self.get_value()
149 except AttributeError:
150 try:
151 return self._Semaphore__value
152 except AttributeError:
153 try:
154 return self._value
155 except AttributeError:
156 raise NotImplementedError
157
158#
159# Testcases
160#
161
162class _TestProcess(BaseTestCase):
163
164 ALLOWED_TYPES = ('processes', 'threads')
165
166 def test_current(self):
167 if self.TYPE == 'threads':
168 return
169
170 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000172
173 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000174 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000175 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000176 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000177 self.assertEqual(current.ident, os.getpid())
178 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000179
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000180 @classmethod
181 def _test(cls, q, *args, **kwds):
182 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000183 q.put(args)
184 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000185 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000186 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188 q.put(current.pid)
189
190 def test_process(self):
191 q = self.Queue(1)
192 e = self.Event()
193 args = (q, 1, 2)
194 kwargs = {'hello':23, 'bye':2.54}
195 name = 'SomeProcess'
196 p = self.Process(
197 target=self._test, args=args, kwargs=kwargs, name=name
198 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000199 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000200 current = self.current_process()
201
202 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000203 self.assertEqual(p.authkey, current.authkey)
204 self.assertEqual(p.is_alive(), False)
205 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000206 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000207 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000208 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000209
210 p.start()
211
Ezio Melotti2623a372010-11-21 13:34:58 +0000212 self.assertEqual(p.exitcode, None)
213 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000214 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000215
Ezio Melotti2623a372010-11-21 13:34:58 +0000216 self.assertEqual(q.get(), args[1:])
217 self.assertEqual(q.get(), kwargs)
218 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000219 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000220 self.assertEqual(q.get(), current.authkey)
221 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222
223 p.join()
224
Ezio Melotti2623a372010-11-21 13:34:58 +0000225 self.assertEqual(p.exitcode, 0)
226 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000227 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000228
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000229 @classmethod
230 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231 time.sleep(1000)
232
233 def test_terminate(self):
234 if self.TYPE == 'threads':
235 return
236
237 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000238 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000239 p.start()
240
241 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000242 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000243 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
245 p.terminate()
246
247 join = TimingWrapper(p.join)
248 self.assertEqual(join(), None)
249 self.assertTimingAlmostEqual(join.elapsed, 0.0)
250
251 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000252 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000253
254 p.join()
255
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000256 # XXX sometimes get p.exitcode == 0 on Windows ...
257 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 def test_cpu_count(self):
260 try:
261 cpus = multiprocessing.cpu_count()
262 except NotImplementedError:
263 cpus = 1
264 self.assertTrue(type(cpus) is int)
265 self.assertTrue(cpus >= 1)
266
267 def test_active_children(self):
268 self.assertEqual(type(self.active_children()), list)
269
270 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000271 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000272
Jesus Cea6f6016b2011-09-09 20:26:57 +0200273 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000275 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000276
277 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000278 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000279
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000280 @classmethod
281 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000282 from multiprocessing import forking
283 wconn.send(id)
284 if len(id) < 2:
285 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000286 p = cls.Process(
287 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288 )
289 p.start()
290 p.join()
291
292 def test_recursion(self):
293 rconn, wconn = self.Pipe(duplex=False)
294 self._test_recursion(wconn, [])
295
296 time.sleep(DELTA)
297 result = []
298 while rconn.poll():
299 result.append(rconn.recv())
300
301 expected = [
302 [],
303 [0],
304 [0, 0],
305 [0, 1],
306 [1],
307 [1, 0],
308 [1, 1]
309 ]
310 self.assertEqual(result, expected)
311
312#
313#
314#
315
316class _UpperCaser(multiprocessing.Process):
317
318 def __init__(self):
319 multiprocessing.Process.__init__(self)
320 self.child_conn, self.parent_conn = multiprocessing.Pipe()
321
322 def run(self):
323 self.parent_conn.close()
324 for s in iter(self.child_conn.recv, None):
325 self.child_conn.send(s.upper())
326 self.child_conn.close()
327
328 def submit(self, s):
329 assert type(s) is str
330 self.parent_conn.send(s)
331 return self.parent_conn.recv()
332
333 def stop(self):
334 self.parent_conn.send(None)
335 self.parent_conn.close()
336 self.child_conn.close()
337
338class _TestSubclassingProcess(BaseTestCase):
339
340 ALLOWED_TYPES = ('processes',)
341
342 def test_subclassing(self):
343 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200344 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000345 uppercaser.start()
346 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
347 self.assertEqual(uppercaser.submit('world'), 'WORLD')
348 uppercaser.stop()
349 uppercaser.join()
350
351#
352#
353#
354
355def queue_empty(q):
356 if hasattr(q, 'empty'):
357 return q.empty()
358 else:
359 return q.qsize() == 0
360
361def queue_full(q, maxsize):
362 if hasattr(q, 'full'):
363 return q.full()
364 else:
365 return q.qsize() == maxsize
366
367
368class _TestQueue(BaseTestCase):
369
370
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000371 @classmethod
372 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000373 child_can_start.wait()
374 for i in range(6):
375 queue.get()
376 parent_can_continue.set()
377
378 def test_put(self):
379 MAXSIZE = 6
380 queue = self.Queue(maxsize=MAXSIZE)
381 child_can_start = self.Event()
382 parent_can_continue = self.Event()
383
384 proc = self.Process(
385 target=self._test_put,
386 args=(queue, child_can_start, parent_can_continue)
387 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000388 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000389 proc.start()
390
391 self.assertEqual(queue_empty(queue), True)
392 self.assertEqual(queue_full(queue, MAXSIZE), False)
393
394 queue.put(1)
395 queue.put(2, True)
396 queue.put(3, True, None)
397 queue.put(4, False)
398 queue.put(5, False, None)
399 queue.put_nowait(6)
400
401 # the values may be in buffer but not yet in pipe so sleep a bit
402 time.sleep(DELTA)
403
404 self.assertEqual(queue_empty(queue), False)
405 self.assertEqual(queue_full(queue, MAXSIZE), True)
406
407 put = TimingWrapper(queue.put)
408 put_nowait = TimingWrapper(queue.put_nowait)
409
410 self.assertRaises(Queue.Full, put, 7, False)
411 self.assertTimingAlmostEqual(put.elapsed, 0)
412
413 self.assertRaises(Queue.Full, put, 7, False, None)
414 self.assertTimingAlmostEqual(put.elapsed, 0)
415
416 self.assertRaises(Queue.Full, put_nowait, 7)
417 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
418
419 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
420 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
421
422 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
423 self.assertTimingAlmostEqual(put.elapsed, 0)
424
425 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
426 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
427
428 child_can_start.set()
429 parent_can_continue.wait()
430
431 self.assertEqual(queue_empty(queue), True)
432 self.assertEqual(queue_full(queue, MAXSIZE), False)
433
434 proc.join()
435
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000436 @classmethod
437 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000438 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000439 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000440 queue.put(2)
441 queue.put(3)
442 queue.put(4)
443 queue.put(5)
444 parent_can_continue.set()
445
446 def test_get(self):
447 queue = self.Queue()
448 child_can_start = self.Event()
449 parent_can_continue = self.Event()
450
451 proc = self.Process(
452 target=self._test_get,
453 args=(queue, child_can_start, parent_can_continue)
454 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000455 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000456 proc.start()
457
458 self.assertEqual(queue_empty(queue), True)
459
460 child_can_start.set()
461 parent_can_continue.wait()
462
463 time.sleep(DELTA)
464 self.assertEqual(queue_empty(queue), False)
465
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000466 # Hangs unexpectedly, remove for now
467 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000468 self.assertEqual(queue.get(True, None), 2)
469 self.assertEqual(queue.get(True), 3)
470 self.assertEqual(queue.get(timeout=1), 4)
471 self.assertEqual(queue.get_nowait(), 5)
472
473 self.assertEqual(queue_empty(queue), True)
474
475 get = TimingWrapper(queue.get)
476 get_nowait = TimingWrapper(queue.get_nowait)
477
478 self.assertRaises(Queue.Empty, get, False)
479 self.assertTimingAlmostEqual(get.elapsed, 0)
480
481 self.assertRaises(Queue.Empty, get, False, None)
482 self.assertTimingAlmostEqual(get.elapsed, 0)
483
484 self.assertRaises(Queue.Empty, get_nowait)
485 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
486
487 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
488 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
489
490 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
491 self.assertTimingAlmostEqual(get.elapsed, 0)
492
493 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
494 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
495
496 proc.join()
497
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000498 @classmethod
499 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000500 for i in range(10, 20):
501 queue.put(i)
502 # note that at this point the items may only be buffered, so the
503 # process cannot shutdown until the feeder thread has finished
504 # pushing items onto the pipe.
505
506 def test_fork(self):
507 # Old versions of Queue would fail to create a new feeder
508 # thread for a forked process if the original process had its
509 # own feeder thread. This test checks that this no longer
510 # happens.
511
512 queue = self.Queue()
513
514 # put items on queue so that main process starts a feeder thread
515 for i in range(10):
516 queue.put(i)
517
518 # wait to make sure thread starts before we fork a new process
519 time.sleep(DELTA)
520
521 # fork process
522 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200523 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000524 p.start()
525
526 # check that all expected items are in the queue
527 for i in range(20):
528 self.assertEqual(queue.get(), i)
529 self.assertRaises(Queue.Empty, queue.get, False)
530
531 p.join()
532
533 def test_qsize(self):
534 q = self.Queue()
535 try:
536 self.assertEqual(q.qsize(), 0)
537 except NotImplementedError:
538 return
539 q.put(1)
540 self.assertEqual(q.qsize(), 1)
541 q.put(5)
542 self.assertEqual(q.qsize(), 2)
543 q.get()
544 self.assertEqual(q.qsize(), 1)
545 q.get()
546 self.assertEqual(q.qsize(), 0)
547
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000548 @classmethod
549 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000550 for obj in iter(q.get, None):
551 time.sleep(DELTA)
552 q.task_done()
553
554 def test_task_done(self):
555 queue = self.JoinableQueue()
556
557 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000558 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000559
560 workers = [self.Process(target=self._test_task_done, args=(queue,))
561 for i in xrange(4)]
562
563 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200564 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000565 p.start()
566
567 for i in xrange(10):
568 queue.put(i)
569
570 queue.join()
571
572 for p in workers:
573 queue.put(None)
574
575 for p in workers:
576 p.join()
577
578#
579#
580#
581
582class _TestLock(BaseTestCase):
583
584 def test_lock(self):
585 lock = self.Lock()
586 self.assertEqual(lock.acquire(), True)
587 self.assertEqual(lock.acquire(False), False)
588 self.assertEqual(lock.release(), None)
589 self.assertRaises((ValueError, threading.ThreadError), lock.release)
590
591 def test_rlock(self):
592 lock = self.RLock()
593 self.assertEqual(lock.acquire(), True)
594 self.assertEqual(lock.acquire(), True)
595 self.assertEqual(lock.acquire(), True)
596 self.assertEqual(lock.release(), None)
597 self.assertEqual(lock.release(), None)
598 self.assertEqual(lock.release(), None)
599 self.assertRaises((AssertionError, RuntimeError), lock.release)
600
Jesse Noller82eb5902009-03-30 23:29:31 +0000601 def test_lock_context(self):
602 with self.Lock():
603 pass
604
Benjamin Petersondfd79492008-06-13 19:13:39 +0000605
606class _TestSemaphore(BaseTestCase):
607
608 def _test_semaphore(self, sem):
609 self.assertReturnsIfImplemented(2, get_value, sem)
610 self.assertEqual(sem.acquire(), True)
611 self.assertReturnsIfImplemented(1, get_value, sem)
612 self.assertEqual(sem.acquire(), True)
613 self.assertReturnsIfImplemented(0, get_value, sem)
614 self.assertEqual(sem.acquire(False), False)
615 self.assertReturnsIfImplemented(0, get_value, sem)
616 self.assertEqual(sem.release(), None)
617 self.assertReturnsIfImplemented(1, get_value, sem)
618 self.assertEqual(sem.release(), None)
619 self.assertReturnsIfImplemented(2, get_value, sem)
620
621 def test_semaphore(self):
622 sem = self.Semaphore(2)
623 self._test_semaphore(sem)
624 self.assertEqual(sem.release(), None)
625 self.assertReturnsIfImplemented(3, get_value, sem)
626 self.assertEqual(sem.release(), None)
627 self.assertReturnsIfImplemented(4, get_value, sem)
628
629 def test_bounded_semaphore(self):
630 sem = self.BoundedSemaphore(2)
631 self._test_semaphore(sem)
632 # Currently fails on OS/X
633 #if HAVE_GETVALUE:
634 # self.assertRaises(ValueError, sem.release)
635 # self.assertReturnsIfImplemented(2, get_value, sem)
636
637 def test_timeout(self):
638 if self.TYPE != 'processes':
639 return
640
641 sem = self.Semaphore(0)
642 acquire = TimingWrapper(sem.acquire)
643
644 self.assertEqual(acquire(False), False)
645 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
646
647 self.assertEqual(acquire(False, None), False)
648 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
649
650 self.assertEqual(acquire(False, TIMEOUT1), False)
651 self.assertTimingAlmostEqual(acquire.elapsed, 0)
652
653 self.assertEqual(acquire(True, TIMEOUT2), False)
654 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
655
656 self.assertEqual(acquire(timeout=TIMEOUT3), False)
657 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
658
659
660class _TestCondition(BaseTestCase):
661
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000662 @classmethod
663 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000664 cond.acquire()
665 sleeping.release()
666 cond.wait(timeout)
667 woken.release()
668 cond.release()
669
670 def check_invariant(self, cond):
671 # this is only supposed to succeed when there are no sleepers
672 if self.TYPE == 'processes':
673 try:
674 sleepers = (cond._sleeping_count.get_value() -
675 cond._woken_count.get_value())
676 self.assertEqual(sleepers, 0)
677 self.assertEqual(cond._wait_semaphore.get_value(), 0)
678 except NotImplementedError:
679 pass
680
681 def test_notify(self):
682 cond = self.Condition()
683 sleeping = self.Semaphore(0)
684 woken = self.Semaphore(0)
685
686 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000687 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000688 p.start()
689
690 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000691 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000692 p.start()
693
694 # wait for both children to start sleeping
695 sleeping.acquire()
696 sleeping.acquire()
697
698 # check no process/thread has woken up
699 time.sleep(DELTA)
700 self.assertReturnsIfImplemented(0, get_value, woken)
701
702 # wake up one process/thread
703 cond.acquire()
704 cond.notify()
705 cond.release()
706
707 # check one process/thread has woken up
708 time.sleep(DELTA)
709 self.assertReturnsIfImplemented(1, get_value, woken)
710
711 # wake up another
712 cond.acquire()
713 cond.notify()
714 cond.release()
715
716 # check other has woken up
717 time.sleep(DELTA)
718 self.assertReturnsIfImplemented(2, get_value, woken)
719
720 # check state is not mucked up
721 self.check_invariant(cond)
722 p.join()
723
724 def test_notify_all(self):
725 cond = self.Condition()
726 sleeping = self.Semaphore(0)
727 woken = self.Semaphore(0)
728
729 # start some threads/processes which will timeout
730 for i in range(3):
731 p = self.Process(target=self.f,
732 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000733 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000734 p.start()
735
736 t = threading.Thread(target=self.f,
737 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000738 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000739 t.start()
740
741 # wait for them all to sleep
742 for i in xrange(6):
743 sleeping.acquire()
744
745 # check they have all timed out
746 for i in xrange(6):
747 woken.acquire()
748 self.assertReturnsIfImplemented(0, get_value, woken)
749
750 # check state is not mucked up
751 self.check_invariant(cond)
752
753 # start some more threads/processes
754 for i in range(3):
755 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000756 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000757 p.start()
758
759 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000760 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000761 t.start()
762
763 # wait for them to all sleep
764 for i in xrange(6):
765 sleeping.acquire()
766
767 # check no process/thread has woken up
768 time.sleep(DELTA)
769 self.assertReturnsIfImplemented(0, get_value, woken)
770
771 # wake them all up
772 cond.acquire()
773 cond.notify_all()
774 cond.release()
775
776 # check they have all woken
777 time.sleep(DELTA)
778 self.assertReturnsIfImplemented(6, get_value, woken)
779
780 # check state is not mucked up
781 self.check_invariant(cond)
782
783 def test_timeout(self):
784 cond = self.Condition()
785 wait = TimingWrapper(cond.wait)
786 cond.acquire()
787 res = wait(TIMEOUT1)
788 cond.release()
789 self.assertEqual(res, None)
790 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
791
792
793class _TestEvent(BaseTestCase):
794
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000795 @classmethod
796 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000797 time.sleep(TIMEOUT2)
798 event.set()
799
800 def test_event(self):
801 event = self.Event()
802 wait = TimingWrapper(event.wait)
803
Ezio Melottic2077b02011-03-16 12:34:31 +0200804 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000806 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000807
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000808 # Removed, threading.Event.wait() will return the value of the __flag
809 # instead of None. API Shear with the semaphore backed mp.Event
810 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000811 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000812 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000813 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
814
815 event.set()
816
817 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000818 self.assertEqual(event.is_set(), True)
819 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000820 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000821 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000822 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
823 # self.assertEqual(event.is_set(), True)
824
825 event.clear()
826
827 #self.assertEqual(event.is_set(), False)
828
Jesus Cea6f6016b2011-09-09 20:26:57 +0200829 p = self.Process(target=self._test_event, args=(event,))
830 p.daemon = True
831 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000832 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833
834#
835#
836#
837
838class _TestValue(BaseTestCase):
839
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000840 ALLOWED_TYPES = ('processes',)
841
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842 codes_values = [
843 ('i', 4343, 24234),
844 ('d', 3.625, -4.25),
845 ('h', -232, 234),
846 ('c', latin('x'), latin('y'))
847 ]
848
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000849 def setUp(self):
850 if not HAS_SHAREDCTYPES:
851 self.skipTest("requires multiprocessing.sharedctypes")
852
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000853 @classmethod
854 def _test(cls, values):
855 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000856 sv.value = cv[2]
857
858
859 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000860 if raw:
861 values = [self.RawValue(code, value)
862 for code, value, _ in self.codes_values]
863 else:
864 values = [self.Value(code, value)
865 for code, value, _ in self.codes_values]
866
867 for sv, cv in zip(values, self.codes_values):
868 self.assertEqual(sv.value, cv[1])
869
870 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200871 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872 proc.start()
873 proc.join()
874
875 for sv, cv in zip(values, self.codes_values):
876 self.assertEqual(sv.value, cv[2])
877
878 def test_rawvalue(self):
879 self.test_value(raw=True)
880
881 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000882 val1 = self.Value('i', 5)
883 lock1 = val1.get_lock()
884 obj1 = val1.get_obj()
885
886 val2 = self.Value('i', 5, lock=None)
887 lock2 = val2.get_lock()
888 obj2 = val2.get_obj()
889
890 lock = self.Lock()
891 val3 = self.Value('i', 5, lock=lock)
892 lock3 = val3.get_lock()
893 obj3 = val3.get_obj()
894 self.assertEqual(lock, lock3)
895
Jesse Noller6ab22152009-01-18 02:45:38 +0000896 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000897 self.assertFalse(hasattr(arr4, 'get_lock'))
898 self.assertFalse(hasattr(arr4, 'get_obj'))
899
Jesse Noller6ab22152009-01-18 02:45:38 +0000900 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
901
902 arr5 = self.RawValue('i', 5)
903 self.assertFalse(hasattr(arr5, 'get_lock'))
904 self.assertFalse(hasattr(arr5, 'get_obj'))
905
Benjamin Petersondfd79492008-06-13 19:13:39 +0000906
907class _TestArray(BaseTestCase):
908
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000909 ALLOWED_TYPES = ('processes',)
910
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000911 @classmethod
912 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000913 for i in range(1, len(seq)):
914 seq[i] += seq[i-1]
915
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000916 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000918 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
919 if raw:
920 arr = self.RawArray('i', seq)
921 else:
922 arr = self.Array('i', seq)
923
924 self.assertEqual(len(arr), len(seq))
925 self.assertEqual(arr[3], seq[3])
926 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
927
928 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
929
930 self.assertEqual(list(arr[:]), seq)
931
932 self.f(seq)
933
934 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200935 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000936 p.start()
937 p.join()
938
939 self.assertEqual(list(arr[:]), seq)
940
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000941 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000942 def test_array_from_size(self):
943 size = 10
944 # Test for zeroing (see issue #11675).
945 # The repetition below strengthens the test by increasing the chances
946 # of previously allocated non-zero memory being used for the new array
947 # on the 2nd and 3rd loops.
948 for _ in range(3):
949 arr = self.Array('i', size)
950 self.assertEqual(len(arr), size)
951 self.assertEqual(list(arr), [0] * size)
952 arr[:] = range(10)
953 self.assertEqual(list(arr), range(10))
954 del arr
955
956 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000957 def test_rawarray(self):
958 self.test_array(raw=True)
959
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000960 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000961 def test_array_accepts_long(self):
962 arr = self.Array('i', 10L)
963 self.assertEqual(len(arr), 10)
964 raw_arr = self.RawArray('i', 10L)
965 self.assertEqual(len(raw_arr), 10)
966
967 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000968 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000969 arr1 = self.Array('i', range(10))
970 lock1 = arr1.get_lock()
971 obj1 = arr1.get_obj()
972
973 arr2 = self.Array('i', range(10), lock=None)
974 lock2 = arr2.get_lock()
975 obj2 = arr2.get_obj()
976
977 lock = self.Lock()
978 arr3 = self.Array('i', range(10), lock=lock)
979 lock3 = arr3.get_lock()
980 obj3 = arr3.get_obj()
981 self.assertEqual(lock, lock3)
982
Jesse Noller6ab22152009-01-18 02:45:38 +0000983 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 self.assertFalse(hasattr(arr4, 'get_lock'))
985 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000986 self.assertRaises(AttributeError,
987 self.Array, 'i', range(10), lock='notalock')
988
989 arr5 = self.RawArray('i', range(10))
990 self.assertFalse(hasattr(arr5, 'get_lock'))
991 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000992
993#
994#
995#
996
997class _TestContainers(BaseTestCase):
998
999 ALLOWED_TYPES = ('manager',)
1000
1001 def test_list(self):
1002 a = self.list(range(10))
1003 self.assertEqual(a[:], range(10))
1004
1005 b = self.list()
1006 self.assertEqual(b[:], [])
1007
1008 b.extend(range(5))
1009 self.assertEqual(b[:], range(5))
1010
1011 self.assertEqual(b[2], 2)
1012 self.assertEqual(b[2:10], [2,3,4])
1013
1014 b *= 2
1015 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1016
1017 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1018
1019 self.assertEqual(a[:], range(10))
1020
1021 d = [a, b]
1022 e = self.list(d)
1023 self.assertEqual(
1024 e[:],
1025 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1026 )
1027
1028 f = self.list([a])
1029 a.append('hello')
1030 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1031
1032 def test_dict(self):
1033 d = self.dict()
1034 indices = range(65, 70)
1035 for i in indices:
1036 d[i] = chr(i)
1037 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1038 self.assertEqual(sorted(d.keys()), indices)
1039 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1040 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1041
1042 def test_namespace(self):
1043 n = self.Namespace()
1044 n.name = 'Bob'
1045 n.job = 'Builder'
1046 n._hidden = 'hidden'
1047 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1048 del n.job
1049 self.assertEqual(str(n), "Namespace(name='Bob')")
1050 self.assertTrue(hasattr(n, 'name'))
1051 self.assertTrue(not hasattr(n, 'job'))
1052
1053#
1054#
1055#
1056
1057def sqr(x, wait=0.0):
1058 time.sleep(wait)
1059 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001060class _TestPool(BaseTestCase):
1061
1062 def test_apply(self):
1063 papply = self.pool.apply
1064 self.assertEqual(papply(sqr, (5,)), sqr(5))
1065 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1066
1067 def test_map(self):
1068 pmap = self.pool.map
1069 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1070 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1071 map(sqr, range(100)))
1072
Jesse Noller7530e472009-07-16 14:23:04 +00001073 def test_map_chunksize(self):
1074 try:
1075 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1076 except multiprocessing.TimeoutError:
1077 self.fail("pool.map_async with chunksize stalled on null list")
1078
Benjamin Petersondfd79492008-06-13 19:13:39 +00001079 def test_async(self):
1080 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1081 get = TimingWrapper(res.get)
1082 self.assertEqual(get(), 49)
1083 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1084
1085 def test_async_timeout(self):
1086 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1087 get = TimingWrapper(res.get)
1088 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1089 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1090
1091 def test_imap(self):
1092 it = self.pool.imap(sqr, range(10))
1093 self.assertEqual(list(it), map(sqr, range(10)))
1094
1095 it = self.pool.imap(sqr, range(10))
1096 for i in range(10):
1097 self.assertEqual(it.next(), i*i)
1098 self.assertRaises(StopIteration, it.next)
1099
1100 it = self.pool.imap(sqr, range(1000), chunksize=100)
1101 for i in range(1000):
1102 self.assertEqual(it.next(), i*i)
1103 self.assertRaises(StopIteration, it.next)
1104
1105 def test_imap_unordered(self):
1106 it = self.pool.imap_unordered(sqr, range(1000))
1107 self.assertEqual(sorted(it), map(sqr, range(1000)))
1108
1109 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1110 self.assertEqual(sorted(it), map(sqr, range(1000)))
1111
1112 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001113 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1114 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1115
Benjamin Petersondfd79492008-06-13 19:13:39 +00001116 p = multiprocessing.Pool(3)
1117 self.assertEqual(3, len(p._pool))
1118 p.close()
1119 p.join()
1120
1121 def test_terminate(self):
1122 if self.TYPE == 'manager':
1123 # On Unix a forked process increfs each shared object to
1124 # which its parent process held a reference. If the
1125 # forked process gets terminated then there is likely to
1126 # be a reference leak. So to prevent
1127 # _TestZZZNumberOfObjects from failing we skip this test
1128 # when using a manager.
1129 return
1130
1131 result = self.pool.map_async(
1132 time.sleep, [0.1 for i in range(10000)], chunksize=1
1133 )
1134 self.pool.terminate()
1135 join = TimingWrapper(self.pool.join)
1136 join()
1137 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001138
1139class _TestPoolWorkerLifetime(BaseTestCase):
1140
1141 ALLOWED_TYPES = ('processes', )
1142 def test_pool_worker_lifetime(self):
1143 p = multiprocessing.Pool(3, maxtasksperchild=10)
1144 self.assertEqual(3, len(p._pool))
1145 origworkerpids = [w.pid for w in p._pool]
1146 # Run many tasks so each worker gets replaced (hopefully)
1147 results = []
1148 for i in range(100):
1149 results.append(p.apply_async(sqr, (i, )))
1150 # Fetch the results and verify we got the right answers,
1151 # also ensuring all the tasks have completed.
1152 for (j, res) in enumerate(results):
1153 self.assertEqual(res.get(), sqr(j))
1154 # Refill the pool
1155 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001156 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001157 # (countdown * DELTA = 5 seconds max startup process time)
1158 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001159 while countdown and not all(w.is_alive() for w in p._pool):
1160 countdown -= 1
1161 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001162 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001163 # All pids should be assigned. See issue #7805.
1164 self.assertNotIn(None, origworkerpids)
1165 self.assertNotIn(None, finalworkerpids)
1166 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001167 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1168 p.close()
1169 p.join()
1170
Benjamin Petersondfd79492008-06-13 19:13:39 +00001171#
1172# Test that manager has expected number of shared objects left
1173#
1174
1175class _TestZZZNumberOfObjects(BaseTestCase):
1176 # Because test cases are sorted alphabetically, this one will get
1177 # run after all the other tests for the manager. It tests that
1178 # there have been no "reference leaks" for the manager's shared
1179 # objects. Note the comment in _TestPool.test_terminate().
1180 ALLOWED_TYPES = ('manager',)
1181
1182 def test_number_of_objects(self):
1183 EXPECTED_NUMBER = 1 # the pool object is still alive
1184 multiprocessing.active_children() # discard dead process objs
1185 gc.collect() # do garbage collection
1186 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001187 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001188 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001189 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001190 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001191
1192 self.assertEqual(refs, EXPECTED_NUMBER)
1193
1194#
1195# Test of creating a customized manager class
1196#
1197
1198from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1199
1200class FooBar(object):
1201 def f(self):
1202 return 'f()'
1203 def g(self):
1204 raise ValueError
1205 def _h(self):
1206 return '_h()'
1207
1208def baz():
1209 for i in xrange(10):
1210 yield i*i
1211
1212class IteratorProxy(BaseProxy):
1213 _exposed_ = ('next', '__next__')
1214 def __iter__(self):
1215 return self
1216 def next(self):
1217 return self._callmethod('next')
1218 def __next__(self):
1219 return self._callmethod('__next__')
1220
1221class MyManager(BaseManager):
1222 pass
1223
1224MyManager.register('Foo', callable=FooBar)
1225MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1226MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1227
1228
1229class _TestMyManager(BaseTestCase):
1230
1231 ALLOWED_TYPES = ('manager',)
1232
1233 def test_mymanager(self):
1234 manager = MyManager()
1235 manager.start()
1236
1237 foo = manager.Foo()
1238 bar = manager.Bar()
1239 baz = manager.baz()
1240
1241 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1242 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1243
1244 self.assertEqual(foo_methods, ['f', 'g'])
1245 self.assertEqual(bar_methods, ['f', '_h'])
1246
1247 self.assertEqual(foo.f(), 'f()')
1248 self.assertRaises(ValueError, foo.g)
1249 self.assertEqual(foo._callmethod('f'), 'f()')
1250 self.assertRaises(RemoteError, foo._callmethod, '_h')
1251
1252 self.assertEqual(bar.f(), 'f()')
1253 self.assertEqual(bar._h(), '_h()')
1254 self.assertEqual(bar._callmethod('f'), 'f()')
1255 self.assertEqual(bar._callmethod('_h'), '_h()')
1256
1257 self.assertEqual(list(baz), [i*i for i in range(10)])
1258
1259 manager.shutdown()
1260
1261#
1262# Test of connecting to a remote server and using xmlrpclib for serialization
1263#
1264
1265_queue = Queue.Queue()
1266def get_queue():
1267 return _queue
1268
1269class QueueManager(BaseManager):
1270 '''manager class used by server process'''
1271QueueManager.register('get_queue', callable=get_queue)
1272
1273class QueueManager2(BaseManager):
1274 '''manager class which specifies the same interface as QueueManager'''
1275QueueManager2.register('get_queue')
1276
1277
1278SERIALIZER = 'xmlrpclib'
1279
1280class _TestRemoteManager(BaseTestCase):
1281
1282 ALLOWED_TYPES = ('manager',)
1283
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001284 @classmethod
1285 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001286 manager = QueueManager2(
1287 address=address, authkey=authkey, serializer=SERIALIZER
1288 )
1289 manager.connect()
1290 queue = manager.get_queue()
1291 queue.put(('hello world', None, True, 2.25))
1292
1293 def test_remote(self):
1294 authkey = os.urandom(32)
1295
1296 manager = QueueManager(
1297 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1298 )
1299 manager.start()
1300
1301 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001302 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001303 p.start()
1304
1305 manager2 = QueueManager2(
1306 address=manager.address, authkey=authkey, serializer=SERIALIZER
1307 )
1308 manager2.connect()
1309 queue = manager2.get_queue()
1310
1311 # Note that xmlrpclib will deserialize object as a list not a tuple
1312 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1313
1314 # Because we are using xmlrpclib for serialization instead of
1315 # pickle this will cause a serialization error.
1316 self.assertRaises(Exception, queue.put, time.sleep)
1317
1318 # Make queue finalizer run before the server is stopped
1319 del queue
1320 manager.shutdown()
1321
Jesse Noller459a6482009-03-30 15:50:42 +00001322class _TestManagerRestart(BaseTestCase):
1323
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001324 @classmethod
1325 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001326 manager = QueueManager(
1327 address=address, authkey=authkey, serializer=SERIALIZER)
1328 manager.connect()
1329 queue = manager.get_queue()
1330 queue.put('hello world')
1331
1332 def test_rapid_restart(self):
1333 authkey = os.urandom(32)
1334 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001335 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001336 srvr = manager.get_server()
1337 addr = srvr.address
1338 # Close the connection.Listener socket which gets opened as a part
1339 # of manager.get_server(). It's not needed for the test.
1340 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001341 manager.start()
1342
1343 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001344 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001345 p.start()
1346 queue = manager.get_queue()
1347 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001348 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001349 manager.shutdown()
1350 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001351 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001352 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001353 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001354
Benjamin Petersondfd79492008-06-13 19:13:39 +00001355#
1356#
1357#
1358
1359SENTINEL = latin('')
1360
1361class _TestConnection(BaseTestCase):
1362
1363 ALLOWED_TYPES = ('processes', 'threads')
1364
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001365 @classmethod
1366 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001367 for msg in iter(conn.recv_bytes, SENTINEL):
1368 conn.send_bytes(msg)
1369 conn.close()
1370
1371 def test_connection(self):
1372 conn, child_conn = self.Pipe()
1373
1374 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001375 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001376 p.start()
1377
1378 seq = [1, 2.25, None]
1379 msg = latin('hello world')
1380 longmsg = msg * 10
1381 arr = array.array('i', range(4))
1382
1383 if self.TYPE == 'processes':
1384 self.assertEqual(type(conn.fileno()), int)
1385
1386 self.assertEqual(conn.send(seq), None)
1387 self.assertEqual(conn.recv(), seq)
1388
1389 self.assertEqual(conn.send_bytes(msg), None)
1390 self.assertEqual(conn.recv_bytes(), msg)
1391
1392 if self.TYPE == 'processes':
1393 buffer = array.array('i', [0]*10)
1394 expected = list(arr) + [0] * (10 - len(arr))
1395 self.assertEqual(conn.send_bytes(arr), None)
1396 self.assertEqual(conn.recv_bytes_into(buffer),
1397 len(arr) * buffer.itemsize)
1398 self.assertEqual(list(buffer), expected)
1399
1400 buffer = array.array('i', [0]*10)
1401 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1402 self.assertEqual(conn.send_bytes(arr), None)
1403 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1404 len(arr) * buffer.itemsize)
1405 self.assertEqual(list(buffer), expected)
1406
1407 buffer = bytearray(latin(' ' * 40))
1408 self.assertEqual(conn.send_bytes(longmsg), None)
1409 try:
1410 res = conn.recv_bytes_into(buffer)
1411 except multiprocessing.BufferTooShort, e:
1412 self.assertEqual(e.args, (longmsg,))
1413 else:
1414 self.fail('expected BufferTooShort, got %s' % res)
1415
1416 poll = TimingWrapper(conn.poll)
1417
1418 self.assertEqual(poll(), False)
1419 self.assertTimingAlmostEqual(poll.elapsed, 0)
1420
1421 self.assertEqual(poll(TIMEOUT1), False)
1422 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1423
1424 conn.send(None)
1425
1426 self.assertEqual(poll(TIMEOUT1), True)
1427 self.assertTimingAlmostEqual(poll.elapsed, 0)
1428
1429 self.assertEqual(conn.recv(), None)
1430
1431 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1432 conn.send_bytes(really_big_msg)
1433 self.assertEqual(conn.recv_bytes(), really_big_msg)
1434
1435 conn.send_bytes(SENTINEL) # tell child to quit
1436 child_conn.close()
1437
1438 if self.TYPE == 'processes':
1439 self.assertEqual(conn.readable, True)
1440 self.assertEqual(conn.writable, True)
1441 self.assertRaises(EOFError, conn.recv)
1442 self.assertRaises(EOFError, conn.recv_bytes)
1443
1444 p.join()
1445
1446 def test_duplex_false(self):
1447 reader, writer = self.Pipe(duplex=False)
1448 self.assertEqual(writer.send(1), None)
1449 self.assertEqual(reader.recv(), 1)
1450 if self.TYPE == 'processes':
1451 self.assertEqual(reader.readable, True)
1452 self.assertEqual(reader.writable, False)
1453 self.assertEqual(writer.readable, False)
1454 self.assertEqual(writer.writable, True)
1455 self.assertRaises(IOError, reader.send, 2)
1456 self.assertRaises(IOError, writer.recv)
1457 self.assertRaises(IOError, writer.poll)
1458
1459 def test_spawn_close(self):
1460 # We test that a pipe connection can be closed by parent
1461 # process immediately after child is spawned. On Windows this
1462 # would have sometimes failed on old versions because
1463 # child_conn would be closed before the child got a chance to
1464 # duplicate it.
1465 conn, child_conn = self.Pipe()
1466
1467 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001468 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001469 p.start()
1470 child_conn.close() # this might complete before child initializes
1471
1472 msg = latin('hello')
1473 conn.send_bytes(msg)
1474 self.assertEqual(conn.recv_bytes(), msg)
1475
1476 conn.send_bytes(SENTINEL)
1477 conn.close()
1478 p.join()
1479
1480 def test_sendbytes(self):
1481 if self.TYPE != 'processes':
1482 return
1483
1484 msg = latin('abcdefghijklmnopqrstuvwxyz')
1485 a, b = self.Pipe()
1486
1487 a.send_bytes(msg)
1488 self.assertEqual(b.recv_bytes(), msg)
1489
1490 a.send_bytes(msg, 5)
1491 self.assertEqual(b.recv_bytes(), msg[5:])
1492
1493 a.send_bytes(msg, 7, 8)
1494 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1495
1496 a.send_bytes(msg, 26)
1497 self.assertEqual(b.recv_bytes(), latin(''))
1498
1499 a.send_bytes(msg, 26, 0)
1500 self.assertEqual(b.recv_bytes(), latin(''))
1501
1502 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1503
1504 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1505
1506 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1507
1508 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1509
1510 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1511
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001512 @classmethod
1513 def _is_fd_assigned(cls, fd):
1514 try:
1515 os.fstat(fd)
1516 except OSError as e:
1517 if e.errno == errno.EBADF:
1518 return False
1519 raise
1520 else:
1521 return True
1522
1523 @classmethod
1524 def _writefd(cls, conn, data, create_dummy_fds=False):
1525 if create_dummy_fds:
1526 for i in range(0, 256):
1527 if not cls._is_fd_assigned(i):
1528 os.dup2(conn.fileno(), i)
1529 fd = reduction.recv_handle(conn)
1530 if msvcrt:
1531 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1532 os.write(fd, data)
1533 os.close(fd)
1534
Charles-François Natalif8413b22011-09-21 18:44:49 +02001535 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001536 def test_fd_transfer(self):
1537 if self.TYPE != 'processes':
1538 self.skipTest("only makes sense with processes")
1539 conn, child_conn = self.Pipe(duplex=True)
1540
1541 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001542 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001543 p.start()
1544 with open(test_support.TESTFN, "wb") as f:
1545 fd = f.fileno()
1546 if msvcrt:
1547 fd = msvcrt.get_osfhandle(fd)
1548 reduction.send_handle(conn, fd, p.pid)
1549 p.join()
1550 with open(test_support.TESTFN, "rb") as f:
1551 self.assertEqual(f.read(), b"foo")
1552
Charles-François Natalif8413b22011-09-21 18:44:49 +02001553 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001554 @unittest.skipIf(sys.platform == "win32",
1555 "test semantics don't make sense on Windows")
1556 @unittest.skipIf(MAXFD <= 256,
1557 "largest assignable fd number is too small")
1558 @unittest.skipUnless(hasattr(os, "dup2"),
1559 "test needs os.dup2()")
1560 def test_large_fd_transfer(self):
1561 # With fd > 256 (issue #11657)
1562 if self.TYPE != 'processes':
1563 self.skipTest("only makes sense with processes")
1564 conn, child_conn = self.Pipe(duplex=True)
1565
1566 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001567 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001568 p.start()
1569 with open(test_support.TESTFN, "wb") as f:
1570 fd = f.fileno()
1571 for newfd in range(256, MAXFD):
1572 if not self._is_fd_assigned(newfd):
1573 break
1574 else:
1575 self.fail("could not find an unassigned large file descriptor")
1576 os.dup2(fd, newfd)
1577 try:
1578 reduction.send_handle(conn, newfd, p.pid)
1579 finally:
1580 os.close(newfd)
1581 p.join()
1582 with open(test_support.TESTFN, "rb") as f:
1583 self.assertEqual(f.read(), b"bar")
1584
Jesus Ceac23484b2011-09-21 03:47:39 +02001585 @classmethod
1586 def _send_data_without_fd(self, conn):
1587 os.write(conn.fileno(), b"\0")
1588
Charles-François Natalif8413b22011-09-21 18:44:49 +02001589 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001590 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1591 def test_missing_fd_transfer(self):
1592 # Check that exception is raised when received data is not
1593 # accompanied by a file descriptor in ancillary data.
1594 if self.TYPE != 'processes':
1595 self.skipTest("only makes sense with processes")
1596 conn, child_conn = self.Pipe(duplex=True)
1597
1598 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1599 p.daemon = True
1600 p.start()
1601 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1602 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001603
Benjamin Petersondfd79492008-06-13 19:13:39 +00001604class _TestListenerClient(BaseTestCase):
1605
1606 ALLOWED_TYPES = ('processes', 'threads')
1607
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001608 @classmethod
1609 def _test(cls, address):
1610 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001611 conn.send('hello')
1612 conn.close()
1613
1614 def test_listener_client(self):
1615 for family in self.connection.families:
1616 l = self.connection.Listener(family=family)
1617 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001618 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001619 p.start()
1620 conn = l.accept()
1621 self.assertEqual(conn.recv(), 'hello')
1622 p.join()
1623 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001624#
1625# Test of sending connection and socket objects between processes
1626#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001627"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001628class _TestPicklingConnections(BaseTestCase):
1629
1630 ALLOWED_TYPES = ('processes',)
1631
1632 def _listener(self, conn, families):
1633 for fam in families:
1634 l = self.connection.Listener(family=fam)
1635 conn.send(l.address)
1636 new_conn = l.accept()
1637 conn.send(new_conn)
1638
1639 if self.TYPE == 'processes':
1640 l = socket.socket()
1641 l.bind(('localhost', 0))
1642 conn.send(l.getsockname())
1643 l.listen(1)
1644 new_conn, addr = l.accept()
1645 conn.send(new_conn)
1646
1647 conn.recv()
1648
1649 def _remote(self, conn):
1650 for (address, msg) in iter(conn.recv, None):
1651 client = self.connection.Client(address)
1652 client.send(msg.upper())
1653 client.close()
1654
1655 if self.TYPE == 'processes':
1656 address, msg = conn.recv()
1657 client = socket.socket()
1658 client.connect(address)
1659 client.sendall(msg.upper())
1660 client.close()
1661
1662 conn.close()
1663
1664 def test_pickling(self):
1665 try:
1666 multiprocessing.allow_connection_pickling()
1667 except ImportError:
1668 return
1669
1670 families = self.connection.families
1671
1672 lconn, lconn0 = self.Pipe()
1673 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001674 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001675 lp.start()
1676 lconn0.close()
1677
1678 rconn, rconn0 = self.Pipe()
1679 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001680 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001681 rp.start()
1682 rconn0.close()
1683
1684 for fam in families:
1685 msg = ('This connection uses family %s' % fam).encode('ascii')
1686 address = lconn.recv()
1687 rconn.send((address, msg))
1688 new_conn = lconn.recv()
1689 self.assertEqual(new_conn.recv(), msg.upper())
1690
1691 rconn.send(None)
1692
1693 if self.TYPE == 'processes':
1694 msg = latin('This connection uses a normal socket')
1695 address = lconn.recv()
1696 rconn.send((address, msg))
1697 if hasattr(socket, 'fromfd'):
1698 new_conn = lconn.recv()
1699 self.assertEqual(new_conn.recv(100), msg.upper())
1700 else:
1701 # XXX On Windows with Py2.6 need to backport fromfd()
1702 discard = lconn.recv_bytes()
1703
1704 lconn.send(None)
1705
1706 rconn.close()
1707 lconn.close()
1708
1709 lp.join()
1710 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001711"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001712#
1713#
1714#
1715
1716class _TestHeap(BaseTestCase):
1717
1718 ALLOWED_TYPES = ('processes',)
1719
1720 def test_heap(self):
1721 iterations = 5000
1722 maxblocks = 50
1723 blocks = []
1724
1725 # create and destroy lots of blocks of different sizes
1726 for i in xrange(iterations):
1727 size = int(random.lognormvariate(0, 1) * 1000)
1728 b = multiprocessing.heap.BufferWrapper(size)
1729 blocks.append(b)
1730 if len(blocks) > maxblocks:
1731 i = random.randrange(maxblocks)
1732 del blocks[i]
1733
1734 # get the heap object
1735 heap = multiprocessing.heap.BufferWrapper._heap
1736
1737 # verify the state of the heap
1738 all = []
1739 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001740 heap._lock.acquire()
1741 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001742 for L in heap._len_to_seq.values():
1743 for arena, start, stop in L:
1744 all.append((heap._arenas.index(arena), start, stop,
1745 stop-start, 'free'))
1746 for arena, start, stop in heap._allocated_blocks:
1747 all.append((heap._arenas.index(arena), start, stop,
1748 stop-start, 'occupied'))
1749 occupied += (stop-start)
1750
1751 all.sort()
1752
1753 for i in range(len(all)-1):
1754 (arena, start, stop) = all[i][:3]
1755 (narena, nstart, nstop) = all[i+1][:3]
1756 self.assertTrue((arena != narena and nstart == 0) or
1757 (stop == nstart))
1758
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001759 def test_free_from_gc(self):
1760 # Check that freeing of blocks by the garbage collector doesn't deadlock
1761 # (issue #12352).
1762 # Make sure the GC is enabled, and set lower collection thresholds to
1763 # make collections more frequent (and increase the probability of
1764 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001765 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001766 gc.enable()
1767 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001768 thresholds = gc.get_threshold()
1769 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001770 gc.set_threshold(10)
1771
1772 # perform numerous block allocations, with cyclic references to make
1773 # sure objects are collected asynchronously by the gc
1774 for i in range(5000):
1775 a = multiprocessing.heap.BufferWrapper(1)
1776 b = multiprocessing.heap.BufferWrapper(1)
1777 # circular references
1778 a.buddy = b
1779 b.buddy = a
1780
Benjamin Petersondfd79492008-06-13 19:13:39 +00001781#
1782#
1783#
1784
Benjamin Petersondfd79492008-06-13 19:13:39 +00001785class _Foo(Structure):
1786 _fields_ = [
1787 ('x', c_int),
1788 ('y', c_double)
1789 ]
1790
1791class _TestSharedCTypes(BaseTestCase):
1792
1793 ALLOWED_TYPES = ('processes',)
1794
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001795 def setUp(self):
1796 if not HAS_SHAREDCTYPES:
1797 self.skipTest("requires multiprocessing.sharedctypes")
1798
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001799 @classmethod
1800 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001801 x.value *= 2
1802 y.value *= 2
1803 foo.x *= 2
1804 foo.y *= 2
1805 string.value *= 2
1806 for i in range(len(arr)):
1807 arr[i] *= 2
1808
1809 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001810 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001811 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001812 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001813 arr = self.Array('d', range(10), lock=lock)
1814 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001815 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001816
1817 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001818 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001819 p.start()
1820 p.join()
1821
1822 self.assertEqual(x.value, 14)
1823 self.assertAlmostEqual(y.value, 2.0/3.0)
1824 self.assertEqual(foo.x, 6)
1825 self.assertAlmostEqual(foo.y, 4.0)
1826 for i in range(10):
1827 self.assertAlmostEqual(arr[i], i*2)
1828 self.assertEqual(string.value, latin('hellohello'))
1829
1830 def test_synchronize(self):
1831 self.test_sharedctypes(lock=True)
1832
1833 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001834 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001835 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001836 foo.x = 0
1837 foo.y = 0
1838 self.assertEqual(bar.x, 2)
1839 self.assertAlmostEqual(bar.y, 5.0)
1840
1841#
1842#
1843#
1844
1845class _TestFinalize(BaseTestCase):
1846
1847 ALLOWED_TYPES = ('processes',)
1848
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001849 @classmethod
1850 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001851 class Foo(object):
1852 pass
1853
1854 a = Foo()
1855 util.Finalize(a, conn.send, args=('a',))
1856 del a # triggers callback for a
1857
1858 b = Foo()
1859 close_b = util.Finalize(b, conn.send, args=('b',))
1860 close_b() # triggers callback for b
1861 close_b() # does nothing because callback has already been called
1862 del b # does nothing because callback has already been called
1863
1864 c = Foo()
1865 util.Finalize(c, conn.send, args=('c',))
1866
1867 d10 = Foo()
1868 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1869
1870 d01 = Foo()
1871 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1872 d02 = Foo()
1873 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1874 d03 = Foo()
1875 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1876
1877 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1878
1879 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1880
Ezio Melottic2077b02011-03-16 12:34:31 +02001881 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001882 # garbage collecting locals
1883 util._exit_function()
1884 conn.close()
1885 os._exit(0)
1886
1887 def test_finalize(self):
1888 conn, child_conn = self.Pipe()
1889
1890 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001891 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001892 p.start()
1893 p.join()
1894
1895 result = [obj for obj in iter(conn.recv, 'STOP')]
1896 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1897
1898#
1899# Test that from ... import * works for each module
1900#
1901
1902class _TestImportStar(BaseTestCase):
1903
1904 ALLOWED_TYPES = ('processes',)
1905
1906 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001907 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001908 'multiprocessing', 'multiprocessing.connection',
1909 'multiprocessing.heap', 'multiprocessing.managers',
1910 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001911 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001912 ]
1913
Charles-François Natalif8413b22011-09-21 18:44:49 +02001914 if HAS_REDUCTION:
1915 modules.append('multiprocessing.reduction')
1916
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001917 if c_int is not None:
1918 # This module requires _ctypes
1919 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001920
1921 for name in modules:
1922 __import__(name)
1923 mod = sys.modules[name]
1924
1925 for attr in getattr(mod, '__all__', ()):
1926 self.assertTrue(
1927 hasattr(mod, attr),
1928 '%r does not have attribute %r' % (mod, attr)
1929 )
1930
1931#
1932# Quick test that logging works -- does not test logging output
1933#
1934
1935class _TestLogging(BaseTestCase):
1936
1937 ALLOWED_TYPES = ('processes',)
1938
1939 def test_enable_logging(self):
1940 logger = multiprocessing.get_logger()
1941 logger.setLevel(util.SUBWARNING)
1942 self.assertTrue(logger is not None)
1943 logger.debug('this will not be printed')
1944 logger.info('nor will this')
1945 logger.setLevel(LOG_LEVEL)
1946
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001947 @classmethod
1948 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001949 logger = multiprocessing.get_logger()
1950 conn.send(logger.getEffectiveLevel())
1951
1952 def test_level(self):
1953 LEVEL1 = 32
1954 LEVEL2 = 37
1955
1956 logger = multiprocessing.get_logger()
1957 root_logger = logging.getLogger()
1958 root_level = root_logger.level
1959
1960 reader, writer = multiprocessing.Pipe(duplex=False)
1961
1962 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001963 p = self.Process(target=self._test_level, args=(writer,))
1964 p.daemon = True
1965 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001966 self.assertEqual(LEVEL1, reader.recv())
1967
1968 logger.setLevel(logging.NOTSET)
1969 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001970 p = self.Process(target=self._test_level, args=(writer,))
1971 p.daemon = True
1972 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001973 self.assertEqual(LEVEL2, reader.recv())
1974
1975 root_logger.setLevel(root_level)
1976 logger.setLevel(level=LOG_LEVEL)
1977
Jesse Noller814d02d2009-11-21 14:38:23 +00001978
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001979# class _TestLoggingProcessName(BaseTestCase):
1980#
1981# def handle(self, record):
1982# assert record.processName == multiprocessing.current_process().name
1983# self.__handled = True
1984#
1985# def test_logging(self):
1986# handler = logging.Handler()
1987# handler.handle = self.handle
1988# self.__handled = False
1989# # Bypass getLogger() and side-effects
1990# logger = logging.getLoggerClass()(
1991# 'multiprocessing.test.TestLoggingProcessName')
1992# logger.addHandler(handler)
1993# logger.propagate = False
1994#
1995# logger.warn('foo')
1996# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001997
Benjamin Petersondfd79492008-06-13 19:13:39 +00001998#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001999# Test to verify handle verification, see issue 3321
2000#
2001
2002class TestInvalidHandle(unittest.TestCase):
2003
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002004 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002005 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002006 conn = _multiprocessing.Connection(44977608)
2007 self.assertRaises(IOError, conn.poll)
2008 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002009
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002010#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002011# Functions used to create test cases from the base ones in this module
2012#
2013
2014def get_attributes(Source, names):
2015 d = {}
2016 for name in names:
2017 obj = getattr(Source, name)
2018 if type(obj) == type(get_attributes):
2019 obj = staticmethod(obj)
2020 d[name] = obj
2021 return d
2022
2023def create_test_cases(Mixin, type):
2024 result = {}
2025 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002026 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002027
2028 for name in glob.keys():
2029 if name.startswith('_Test'):
2030 base = glob[name]
2031 if type in base.ALLOWED_TYPES:
2032 newname = 'With' + Type + name[1:]
2033 class Temp(base, unittest.TestCase, Mixin):
2034 pass
2035 result[newname] = Temp
2036 Temp.__name__ = newname
2037 Temp.__module__ = Mixin.__module__
2038 return result
2039
2040#
2041# Create test cases
2042#
2043
2044class ProcessesMixin(object):
2045 TYPE = 'processes'
2046 Process = multiprocessing.Process
2047 locals().update(get_attributes(multiprocessing, (
2048 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2049 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2050 'RawArray', 'current_process', 'active_children', 'Pipe',
2051 'connection', 'JoinableQueue'
2052 )))
2053
2054testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2055globals().update(testcases_processes)
2056
2057
2058class ManagerMixin(object):
2059 TYPE = 'manager'
2060 Process = multiprocessing.Process
2061 manager = object.__new__(multiprocessing.managers.SyncManager)
2062 locals().update(get_attributes(manager, (
2063 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2064 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2065 'Namespace', 'JoinableQueue'
2066 )))
2067
2068testcases_manager = create_test_cases(ManagerMixin, type='manager')
2069globals().update(testcases_manager)
2070
2071
2072class ThreadsMixin(object):
2073 TYPE = 'threads'
2074 Process = multiprocessing.dummy.Process
2075 locals().update(get_attributes(multiprocessing.dummy, (
2076 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2077 'Condition', 'Event', 'Value', 'Array', 'current_process',
2078 'active_children', 'Pipe', 'connection', 'dict', 'list',
2079 'Namespace', 'JoinableQueue'
2080 )))
2081
2082testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2083globals().update(testcases_threads)
2084
Neal Norwitz0c519b32008-08-25 01:50:24 +00002085class OtherTest(unittest.TestCase):
2086 # TODO: add more tests for deliver/answer challenge.
2087 def test_deliver_challenge_auth_failure(self):
2088 class _FakeConnection(object):
2089 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002090 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002091 def send_bytes(self, data):
2092 pass
2093 self.assertRaises(multiprocessing.AuthenticationError,
2094 multiprocessing.connection.deliver_challenge,
2095 _FakeConnection(), b'abc')
2096
2097 def test_answer_challenge_auth_failure(self):
2098 class _FakeConnection(object):
2099 def __init__(self):
2100 self.count = 0
2101 def recv_bytes(self, size):
2102 self.count += 1
2103 if self.count == 1:
2104 return multiprocessing.connection.CHALLENGE
2105 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002106 return b'something bogus'
2107 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002108 def send_bytes(self, data):
2109 pass
2110 self.assertRaises(multiprocessing.AuthenticationError,
2111 multiprocessing.connection.answer_challenge,
2112 _FakeConnection(), b'abc')
2113
Jesse Noller7152f6d2009-04-02 05:17:26 +00002114#
2115# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2116#
2117
2118def initializer(ns):
2119 ns.test += 1
2120
2121class TestInitializers(unittest.TestCase):
2122 def setUp(self):
2123 self.mgr = multiprocessing.Manager()
2124 self.ns = self.mgr.Namespace()
2125 self.ns.test = 0
2126
2127 def tearDown(self):
2128 self.mgr.shutdown()
2129
2130 def test_manager_initializer(self):
2131 m = multiprocessing.managers.SyncManager()
2132 self.assertRaises(TypeError, m.start, 1)
2133 m.start(initializer, (self.ns,))
2134 self.assertEqual(self.ns.test, 1)
2135 m.shutdown()
2136
2137 def test_pool_initializer(self):
2138 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2139 p = multiprocessing.Pool(1, initializer, (self.ns,))
2140 p.close()
2141 p.join()
2142 self.assertEqual(self.ns.test, 1)
2143
Jesse Noller1b90efb2009-06-30 17:11:52 +00002144#
2145# Issue 5155, 5313, 5331: Test process in processes
2146# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2147#
2148
2149def _ThisSubProcess(q):
2150 try:
2151 item = q.get(block=False)
2152 except Queue.Empty:
2153 pass
2154
2155def _TestProcess(q):
2156 queue = multiprocessing.Queue()
2157 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002158 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002159 subProc.start()
2160 subProc.join()
2161
2162def _afunc(x):
2163 return x*x
2164
2165def pool_in_process():
2166 pool = multiprocessing.Pool(processes=4)
2167 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2168
2169class _file_like(object):
2170 def __init__(self, delegate):
2171 self._delegate = delegate
2172 self._pid = None
2173
2174 @property
2175 def cache(self):
2176 pid = os.getpid()
2177 # There are no race conditions since fork keeps only the running thread
2178 if pid != self._pid:
2179 self._pid = pid
2180 self._cache = []
2181 return self._cache
2182
2183 def write(self, data):
2184 self.cache.append(data)
2185
2186 def flush(self):
2187 self._delegate.write(''.join(self.cache))
2188 self._cache = []
2189
2190class TestStdinBadfiledescriptor(unittest.TestCase):
2191
2192 def test_queue_in_process(self):
2193 queue = multiprocessing.Queue()
2194 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2195 proc.start()
2196 proc.join()
2197
2198 def test_pool_in_process(self):
2199 p = multiprocessing.Process(target=pool_in_process)
2200 p.start()
2201 p.join()
2202
2203 def test_flushing(self):
2204 sio = StringIO()
2205 flike = _file_like(sio)
2206 flike.write('foo')
2207 proc = multiprocessing.Process(target=lambda: flike.flush())
2208 flike.flush()
2209 assert sio.getvalue() == 'foo'
2210
2211testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2212 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002213
Benjamin Petersondfd79492008-06-13 19:13:39 +00002214#
2215#
2216#
2217
2218def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002219 if sys.platform.startswith("linux"):
2220 try:
2221 lock = multiprocessing.RLock()
2222 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002223 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002224
Benjamin Petersondfd79492008-06-13 19:13:39 +00002225 if run is None:
2226 from test.test_support import run_unittest as run
2227
2228 util.get_temp_dir() # creates temp directory for use by all processes
2229
2230 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2231
Jesse Noller146b7ab2008-07-02 16:44:09 +00002232 ProcessesMixin.pool = multiprocessing.Pool(4)
2233 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2234 ManagerMixin.manager.__init__()
2235 ManagerMixin.manager.start()
2236 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002237
2238 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002239 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2240 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002241 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2242 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002243 )
2244
2245 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2246 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002247 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2248 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002249 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002250 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002251 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002252 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2253 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2254 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002255 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002256
Jesse Noller146b7ab2008-07-02 16:44:09 +00002257 ThreadsMixin.pool.terminate()
2258 ProcessesMixin.pool.terminate()
2259 ManagerMixin.pool.terminate()
2260 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002261
Jesse Noller146b7ab2008-07-02 16:44:09 +00002262 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002263
2264def main():
2265 test_main(unittest.TextTestRunner(verbosity=2).run)
2266
2267if __name__ == '__main__':
2268 main()