blob: 1b93a82ddeafc9299b4a4a64ce5124ef05f8c816 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Antoine Pitroua1a8da82011-08-23 19:54:20 +020036from multiprocessing import util, reduction
Benjamin Petersondfd79492008-06-13 19:13:39 +000037
Brian Curtina06e9b82010-10-07 02:27:41 +000038try:
39 from multiprocessing.sharedctypes import Value, copy
40 HAS_SHAREDCTYPES = True
41except ImportError:
42 HAS_SHAREDCTYPES = False
43
Antoine Pitroua1a8da82011-08-23 19:54:20 +020044try:
45 import msvcrt
46except ImportError:
47 msvcrt = None
48
Benjamin Petersondfd79492008-06-13 19:13:39 +000049#
50#
51#
52
Benjamin Petersone79edf52008-07-13 18:34:58 +000053latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000054
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56# Constants
57#
58
59LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000060#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000061
62DELTA = 0.1
63CHECK_TIMINGS = False # making true makes tests take a lot longer
64 # and can sometimes cause some non-serious
65 # failures because some calls block a bit
66 # longer than expected
67if CHECK_TIMINGS:
68 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
69else:
70 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
71
72HAVE_GETVALUE = not getattr(_multiprocessing,
73 'HAVE_BROKEN_SEM_GETVALUE', False)
74
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000075WIN32 = (sys.platform == "win32")
76
Antoine Pitroua1a8da82011-08-23 19:54:20 +020077try:
78 MAXFD = os.sysconf("SC_OPEN_MAX")
79except:
80 MAXFD = 256
81
Benjamin Petersondfd79492008-06-13 19:13:39 +000082#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000083# Some tests require ctypes
84#
85
86try:
Nick Coghlan13623662010-04-10 14:24:36 +000087 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000088except ImportError:
89 Structure = object
90 c_int = c_double = None
91
92#
Benjamin Petersondfd79492008-06-13 19:13:39 +000093# Creates a wrapper for a function which records the time it takes to finish
94#
95
96class TimingWrapper(object):
97
98 def __init__(self, func):
99 self.func = func
100 self.elapsed = None
101
102 def __call__(self, *args, **kwds):
103 t = time.time()
104 try:
105 return self.func(*args, **kwds)
106 finally:
107 self.elapsed = time.time() - t
108
109#
110# Base class for test cases
111#
112
113class BaseTestCase(object):
114
115 ALLOWED_TYPES = ('processes', 'manager', 'threads')
116
117 def assertTimingAlmostEqual(self, a, b):
118 if CHECK_TIMINGS:
119 self.assertAlmostEqual(a, b, 1)
120
121 def assertReturnsIfImplemented(self, value, func, *args):
122 try:
123 res = func(*args)
124 except NotImplementedError:
125 pass
126 else:
127 return self.assertEqual(value, res)
128
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000129 # For the sanity of Windows users, rather than crashing or freezing in
130 # multiple ways.
131 def __reduce__(self, *args):
132 raise NotImplementedError("shouldn't try to pickle a test case")
133
134 __reduce_ex__ = __reduce__
135
Benjamin Petersondfd79492008-06-13 19:13:39 +0000136#
137# Return the value of a semaphore
138#
139
140def get_value(self):
141 try:
142 return self.get_value()
143 except AttributeError:
144 try:
145 return self._Semaphore__value
146 except AttributeError:
147 try:
148 return self._value
149 except AttributeError:
150 raise NotImplementedError
151
152#
153# Testcases
154#
155
156class _TestProcess(BaseTestCase):
157
158 ALLOWED_TYPES = ('processes', 'threads')
159
160 def test_current(self):
161 if self.TYPE == 'threads':
162 return
163
164 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000165 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000166
167 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000168 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000169 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000170 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 self.assertEqual(current.ident, os.getpid())
172 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000173
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000174 @classmethod
175 def _test(cls, q, *args, **kwds):
176 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000177 q.put(args)
178 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000179 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000180 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000181 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000182 q.put(current.pid)
183
184 def test_process(self):
185 q = self.Queue(1)
186 e = self.Event()
187 args = (q, 1, 2)
188 kwargs = {'hello':23, 'bye':2.54}
189 name = 'SomeProcess'
190 p = self.Process(
191 target=self._test, args=args, kwargs=kwargs, name=name
192 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194 current = self.current_process()
195
196 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000197 self.assertEqual(p.authkey, current.authkey)
198 self.assertEqual(p.is_alive(), False)
199 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000200 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000203
204 p.start()
205
Ezio Melotti2623a372010-11-21 13:34:58 +0000206 self.assertEqual(p.exitcode, None)
207 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000208 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000209
Ezio Melotti2623a372010-11-21 13:34:58 +0000210 self.assertEqual(q.get(), args[1:])
211 self.assertEqual(q.get(), kwargs)
212 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000213 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000214 self.assertEqual(q.get(), current.authkey)
215 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216
217 p.join()
218
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.exitcode, 0)
220 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000221 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000223 @classmethod
224 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225 time.sleep(1000)
226
227 def test_terminate(self):
228 if self.TYPE == 'threads':
229 return
230
231 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000232 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000233 p.start()
234
235 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000236 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000237 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.terminate()
240
241 join = TimingWrapper(p.join)
242 self.assertEqual(join(), None)
243 self.assertTimingAlmostEqual(join.elapsed, 0.0)
244
245 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247
248 p.join()
249
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000250 # XXX sometimes get p.exitcode == 0 on Windows ...
251 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 def test_cpu_count(self):
254 try:
255 cpus = multiprocessing.cpu_count()
256 except NotImplementedError:
257 cpus = 1
258 self.assertTrue(type(cpus) is int)
259 self.assertTrue(cpus >= 1)
260
261 def test_active_children(self):
262 self.assertEqual(type(self.active_children()), list)
263
264 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000265 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000266
Jesus Cea6f6016b2011-09-09 20:26:57 +0200267 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000268 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000269 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270
271 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000272 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000273
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000274 @classmethod
275 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000276 from multiprocessing import forking
277 wconn.send(id)
278 if len(id) < 2:
279 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000280 p = cls.Process(
281 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000282 )
283 p.start()
284 p.join()
285
286 def test_recursion(self):
287 rconn, wconn = self.Pipe(duplex=False)
288 self._test_recursion(wconn, [])
289
290 time.sleep(DELTA)
291 result = []
292 while rconn.poll():
293 result.append(rconn.recv())
294
295 expected = [
296 [],
297 [0],
298 [0, 0],
299 [0, 1],
300 [1],
301 [1, 0],
302 [1, 1]
303 ]
304 self.assertEqual(result, expected)
305
306#
307#
308#
309
310class _UpperCaser(multiprocessing.Process):
311
312 def __init__(self):
313 multiprocessing.Process.__init__(self)
314 self.child_conn, self.parent_conn = multiprocessing.Pipe()
315
316 def run(self):
317 self.parent_conn.close()
318 for s in iter(self.child_conn.recv, None):
319 self.child_conn.send(s.upper())
320 self.child_conn.close()
321
322 def submit(self, s):
323 assert type(s) is str
324 self.parent_conn.send(s)
325 return self.parent_conn.recv()
326
327 def stop(self):
328 self.parent_conn.send(None)
329 self.parent_conn.close()
330 self.child_conn.close()
331
332class _TestSubclassingProcess(BaseTestCase):
333
334 ALLOWED_TYPES = ('processes',)
335
336 def test_subclassing(self):
337 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200338 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000339 uppercaser.start()
340 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
341 self.assertEqual(uppercaser.submit('world'), 'WORLD')
342 uppercaser.stop()
343 uppercaser.join()
344
345#
346#
347#
348
349def queue_empty(q):
350 if hasattr(q, 'empty'):
351 return q.empty()
352 else:
353 return q.qsize() == 0
354
355def queue_full(q, maxsize):
356 if hasattr(q, 'full'):
357 return q.full()
358 else:
359 return q.qsize() == maxsize
360
361
362class _TestQueue(BaseTestCase):
363
364
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000365 @classmethod
366 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000367 child_can_start.wait()
368 for i in range(6):
369 queue.get()
370 parent_can_continue.set()
371
372 def test_put(self):
373 MAXSIZE = 6
374 queue = self.Queue(maxsize=MAXSIZE)
375 child_can_start = self.Event()
376 parent_can_continue = self.Event()
377
378 proc = self.Process(
379 target=self._test_put,
380 args=(queue, child_can_start, parent_can_continue)
381 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000382 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000383 proc.start()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 queue.put(1)
389 queue.put(2, True)
390 queue.put(3, True, None)
391 queue.put(4, False)
392 queue.put(5, False, None)
393 queue.put_nowait(6)
394
395 # the values may be in buffer but not yet in pipe so sleep a bit
396 time.sleep(DELTA)
397
398 self.assertEqual(queue_empty(queue), False)
399 self.assertEqual(queue_full(queue, MAXSIZE), True)
400
401 put = TimingWrapper(queue.put)
402 put_nowait = TimingWrapper(queue.put_nowait)
403
404 self.assertRaises(Queue.Full, put, 7, False)
405 self.assertTimingAlmostEqual(put.elapsed, 0)
406
407 self.assertRaises(Queue.Full, put, 7, False, None)
408 self.assertTimingAlmostEqual(put.elapsed, 0)
409
410 self.assertRaises(Queue.Full, put_nowait, 7)
411 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
412
413 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
414 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
415
416 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
417 self.assertTimingAlmostEqual(put.elapsed, 0)
418
419 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
420 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
421
422 child_can_start.set()
423 parent_can_continue.wait()
424
425 self.assertEqual(queue_empty(queue), True)
426 self.assertEqual(queue_full(queue, MAXSIZE), False)
427
428 proc.join()
429
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000430 @classmethod
431 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000432 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000433 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000434 queue.put(2)
435 queue.put(3)
436 queue.put(4)
437 queue.put(5)
438 parent_can_continue.set()
439
440 def test_get(self):
441 queue = self.Queue()
442 child_can_start = self.Event()
443 parent_can_continue = self.Event()
444
445 proc = self.Process(
446 target=self._test_get,
447 args=(queue, child_can_start, parent_can_continue)
448 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000449 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000450 proc.start()
451
452 self.assertEqual(queue_empty(queue), True)
453
454 child_can_start.set()
455 parent_can_continue.wait()
456
457 time.sleep(DELTA)
458 self.assertEqual(queue_empty(queue), False)
459
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000460 # Hangs unexpectedly, remove for now
461 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000462 self.assertEqual(queue.get(True, None), 2)
463 self.assertEqual(queue.get(True), 3)
464 self.assertEqual(queue.get(timeout=1), 4)
465 self.assertEqual(queue.get_nowait(), 5)
466
467 self.assertEqual(queue_empty(queue), True)
468
469 get = TimingWrapper(queue.get)
470 get_nowait = TimingWrapper(queue.get_nowait)
471
472 self.assertRaises(Queue.Empty, get, False)
473 self.assertTimingAlmostEqual(get.elapsed, 0)
474
475 self.assertRaises(Queue.Empty, get, False, None)
476 self.assertTimingAlmostEqual(get.elapsed, 0)
477
478 self.assertRaises(Queue.Empty, get_nowait)
479 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
480
481 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
482 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
483
484 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
485 self.assertTimingAlmostEqual(get.elapsed, 0)
486
487 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
488 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
489
490 proc.join()
491
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000492 @classmethod
493 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000494 for i in range(10, 20):
495 queue.put(i)
496 # note that at this point the items may only be buffered, so the
497 # process cannot shutdown until the feeder thread has finished
498 # pushing items onto the pipe.
499
500 def test_fork(self):
501 # Old versions of Queue would fail to create a new feeder
502 # thread for a forked process if the original process had its
503 # own feeder thread. This test checks that this no longer
504 # happens.
505
506 queue = self.Queue()
507
508 # put items on queue so that main process starts a feeder thread
509 for i in range(10):
510 queue.put(i)
511
512 # wait to make sure thread starts before we fork a new process
513 time.sleep(DELTA)
514
515 # fork process
516 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200517 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000518 p.start()
519
520 # check that all expected items are in the queue
521 for i in range(20):
522 self.assertEqual(queue.get(), i)
523 self.assertRaises(Queue.Empty, queue.get, False)
524
525 p.join()
526
527 def test_qsize(self):
528 q = self.Queue()
529 try:
530 self.assertEqual(q.qsize(), 0)
531 except NotImplementedError:
532 return
533 q.put(1)
534 self.assertEqual(q.qsize(), 1)
535 q.put(5)
536 self.assertEqual(q.qsize(), 2)
537 q.get()
538 self.assertEqual(q.qsize(), 1)
539 q.get()
540 self.assertEqual(q.qsize(), 0)
541
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000542 @classmethod
543 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000544 for obj in iter(q.get, None):
545 time.sleep(DELTA)
546 q.task_done()
547
548 def test_task_done(self):
549 queue = self.JoinableQueue()
550
551 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000552 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000553
554 workers = [self.Process(target=self._test_task_done, args=(queue,))
555 for i in xrange(4)]
556
557 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200558 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000559 p.start()
560
561 for i in xrange(10):
562 queue.put(i)
563
564 queue.join()
565
566 for p in workers:
567 queue.put(None)
568
569 for p in workers:
570 p.join()
571
572#
573#
574#
575
576class _TestLock(BaseTestCase):
577
578 def test_lock(self):
579 lock = self.Lock()
580 self.assertEqual(lock.acquire(), True)
581 self.assertEqual(lock.acquire(False), False)
582 self.assertEqual(lock.release(), None)
583 self.assertRaises((ValueError, threading.ThreadError), lock.release)
584
585 def test_rlock(self):
586 lock = self.RLock()
587 self.assertEqual(lock.acquire(), True)
588 self.assertEqual(lock.acquire(), True)
589 self.assertEqual(lock.acquire(), True)
590 self.assertEqual(lock.release(), None)
591 self.assertEqual(lock.release(), None)
592 self.assertEqual(lock.release(), None)
593 self.assertRaises((AssertionError, RuntimeError), lock.release)
594
Jesse Noller82eb5902009-03-30 23:29:31 +0000595 def test_lock_context(self):
596 with self.Lock():
597 pass
598
Benjamin Petersondfd79492008-06-13 19:13:39 +0000599
600class _TestSemaphore(BaseTestCase):
601
602 def _test_semaphore(self, sem):
603 self.assertReturnsIfImplemented(2, get_value, sem)
604 self.assertEqual(sem.acquire(), True)
605 self.assertReturnsIfImplemented(1, get_value, sem)
606 self.assertEqual(sem.acquire(), True)
607 self.assertReturnsIfImplemented(0, get_value, sem)
608 self.assertEqual(sem.acquire(False), False)
609 self.assertReturnsIfImplemented(0, get_value, sem)
610 self.assertEqual(sem.release(), None)
611 self.assertReturnsIfImplemented(1, get_value, sem)
612 self.assertEqual(sem.release(), None)
613 self.assertReturnsIfImplemented(2, get_value, sem)
614
615 def test_semaphore(self):
616 sem = self.Semaphore(2)
617 self._test_semaphore(sem)
618 self.assertEqual(sem.release(), None)
619 self.assertReturnsIfImplemented(3, get_value, sem)
620 self.assertEqual(sem.release(), None)
621 self.assertReturnsIfImplemented(4, get_value, sem)
622
623 def test_bounded_semaphore(self):
624 sem = self.BoundedSemaphore(2)
625 self._test_semaphore(sem)
626 # Currently fails on OS/X
627 #if HAVE_GETVALUE:
628 # self.assertRaises(ValueError, sem.release)
629 # self.assertReturnsIfImplemented(2, get_value, sem)
630
631 def test_timeout(self):
632 if self.TYPE != 'processes':
633 return
634
635 sem = self.Semaphore(0)
636 acquire = TimingWrapper(sem.acquire)
637
638 self.assertEqual(acquire(False), False)
639 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
640
641 self.assertEqual(acquire(False, None), False)
642 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
643
644 self.assertEqual(acquire(False, TIMEOUT1), False)
645 self.assertTimingAlmostEqual(acquire.elapsed, 0)
646
647 self.assertEqual(acquire(True, TIMEOUT2), False)
648 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
649
650 self.assertEqual(acquire(timeout=TIMEOUT3), False)
651 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
652
653
654class _TestCondition(BaseTestCase):
655
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000656 @classmethod
657 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000658 cond.acquire()
659 sleeping.release()
660 cond.wait(timeout)
661 woken.release()
662 cond.release()
663
664 def check_invariant(self, cond):
665 # this is only supposed to succeed when there are no sleepers
666 if self.TYPE == 'processes':
667 try:
668 sleepers = (cond._sleeping_count.get_value() -
669 cond._woken_count.get_value())
670 self.assertEqual(sleepers, 0)
671 self.assertEqual(cond._wait_semaphore.get_value(), 0)
672 except NotImplementedError:
673 pass
674
675 def test_notify(self):
676 cond = self.Condition()
677 sleeping = self.Semaphore(0)
678 woken = self.Semaphore(0)
679
680 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000681 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 p.start()
683
684 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000685 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000686 p.start()
687
688 # wait for both children to start sleeping
689 sleeping.acquire()
690 sleeping.acquire()
691
692 # check no process/thread has woken up
693 time.sleep(DELTA)
694 self.assertReturnsIfImplemented(0, get_value, woken)
695
696 # wake up one process/thread
697 cond.acquire()
698 cond.notify()
699 cond.release()
700
701 # check one process/thread has woken up
702 time.sleep(DELTA)
703 self.assertReturnsIfImplemented(1, get_value, woken)
704
705 # wake up another
706 cond.acquire()
707 cond.notify()
708 cond.release()
709
710 # check other has woken up
711 time.sleep(DELTA)
712 self.assertReturnsIfImplemented(2, get_value, woken)
713
714 # check state is not mucked up
715 self.check_invariant(cond)
716 p.join()
717
718 def test_notify_all(self):
719 cond = self.Condition()
720 sleeping = self.Semaphore(0)
721 woken = self.Semaphore(0)
722
723 # start some threads/processes which will timeout
724 for i in range(3):
725 p = self.Process(target=self.f,
726 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000727 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000728 p.start()
729
730 t = threading.Thread(target=self.f,
731 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000732 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000733 t.start()
734
735 # wait for them all to sleep
736 for i in xrange(6):
737 sleeping.acquire()
738
739 # check they have all timed out
740 for i in xrange(6):
741 woken.acquire()
742 self.assertReturnsIfImplemented(0, get_value, woken)
743
744 # check state is not mucked up
745 self.check_invariant(cond)
746
747 # start some more threads/processes
748 for i in range(3):
749 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000750 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000751 p.start()
752
753 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000754 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000755 t.start()
756
757 # wait for them to all sleep
758 for i in xrange(6):
759 sleeping.acquire()
760
761 # check no process/thread has woken up
762 time.sleep(DELTA)
763 self.assertReturnsIfImplemented(0, get_value, woken)
764
765 # wake them all up
766 cond.acquire()
767 cond.notify_all()
768 cond.release()
769
770 # check they have all woken
771 time.sleep(DELTA)
772 self.assertReturnsIfImplemented(6, get_value, woken)
773
774 # check state is not mucked up
775 self.check_invariant(cond)
776
777 def test_timeout(self):
778 cond = self.Condition()
779 wait = TimingWrapper(cond.wait)
780 cond.acquire()
781 res = wait(TIMEOUT1)
782 cond.release()
783 self.assertEqual(res, None)
784 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
785
786
787class _TestEvent(BaseTestCase):
788
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000789 @classmethod
790 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000791 time.sleep(TIMEOUT2)
792 event.set()
793
794 def test_event(self):
795 event = self.Event()
796 wait = TimingWrapper(event.wait)
797
Ezio Melottic2077b02011-03-16 12:34:31 +0200798 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000799 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000800 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000801
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000802 # Removed, threading.Event.wait() will return the value of the __flag
803 # instead of None. API Shear with the semaphore backed mp.Event
804 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000805 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000806 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000807 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
808
809 event.set()
810
811 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000812 self.assertEqual(event.is_set(), True)
813 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000814 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000815 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000816 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
817 # self.assertEqual(event.is_set(), True)
818
819 event.clear()
820
821 #self.assertEqual(event.is_set(), False)
822
Jesus Cea6f6016b2011-09-09 20:26:57 +0200823 p = self.Process(target=self._test_event, args=(event,))
824 p.daemon = True
825 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000826 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827
828#
829#
830#
831
832class _TestValue(BaseTestCase):
833
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000834 ALLOWED_TYPES = ('processes',)
835
Benjamin Petersondfd79492008-06-13 19:13:39 +0000836 codes_values = [
837 ('i', 4343, 24234),
838 ('d', 3.625, -4.25),
839 ('h', -232, 234),
840 ('c', latin('x'), latin('y'))
841 ]
842
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000843 def setUp(self):
844 if not HAS_SHAREDCTYPES:
845 self.skipTest("requires multiprocessing.sharedctypes")
846
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000847 @classmethod
848 def _test(cls, values):
849 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000850 sv.value = cv[2]
851
852
853 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000854 if raw:
855 values = [self.RawValue(code, value)
856 for code, value, _ in self.codes_values]
857 else:
858 values = [self.Value(code, value)
859 for code, value, _ in self.codes_values]
860
861 for sv, cv in zip(values, self.codes_values):
862 self.assertEqual(sv.value, cv[1])
863
864 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200865 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000866 proc.start()
867 proc.join()
868
869 for sv, cv in zip(values, self.codes_values):
870 self.assertEqual(sv.value, cv[2])
871
872 def test_rawvalue(self):
873 self.test_value(raw=True)
874
875 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 val1 = self.Value('i', 5)
877 lock1 = val1.get_lock()
878 obj1 = val1.get_obj()
879
880 val2 = self.Value('i', 5, lock=None)
881 lock2 = val2.get_lock()
882 obj2 = val2.get_obj()
883
884 lock = self.Lock()
885 val3 = self.Value('i', 5, lock=lock)
886 lock3 = val3.get_lock()
887 obj3 = val3.get_obj()
888 self.assertEqual(lock, lock3)
889
Jesse Noller6ab22152009-01-18 02:45:38 +0000890 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000891 self.assertFalse(hasattr(arr4, 'get_lock'))
892 self.assertFalse(hasattr(arr4, 'get_obj'))
893
Jesse Noller6ab22152009-01-18 02:45:38 +0000894 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
895
896 arr5 = self.RawValue('i', 5)
897 self.assertFalse(hasattr(arr5, 'get_lock'))
898 self.assertFalse(hasattr(arr5, 'get_obj'))
899
Benjamin Petersondfd79492008-06-13 19:13:39 +0000900
901class _TestArray(BaseTestCase):
902
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000903 ALLOWED_TYPES = ('processes',)
904
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000905 @classmethod
906 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 for i in range(1, len(seq)):
908 seq[i] += seq[i-1]
909
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000910 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000911 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
913 if raw:
914 arr = self.RawArray('i', seq)
915 else:
916 arr = self.Array('i', seq)
917
918 self.assertEqual(len(arr), len(seq))
919 self.assertEqual(arr[3], seq[3])
920 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
921
922 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
923
924 self.assertEqual(list(arr[:]), seq)
925
926 self.f(seq)
927
928 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200929 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000930 p.start()
931 p.join()
932
933 self.assertEqual(list(arr[:]), seq)
934
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000935 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000936 def test_array_from_size(self):
937 size = 10
938 # Test for zeroing (see issue #11675).
939 # The repetition below strengthens the test by increasing the chances
940 # of previously allocated non-zero memory being used for the new array
941 # on the 2nd and 3rd loops.
942 for _ in range(3):
943 arr = self.Array('i', size)
944 self.assertEqual(len(arr), size)
945 self.assertEqual(list(arr), [0] * size)
946 arr[:] = range(10)
947 self.assertEqual(list(arr), range(10))
948 del arr
949
950 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000951 def test_rawarray(self):
952 self.test_array(raw=True)
953
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000954 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000955 def test_array_accepts_long(self):
956 arr = self.Array('i', 10L)
957 self.assertEqual(len(arr), 10)
958 raw_arr = self.RawArray('i', 10L)
959 self.assertEqual(len(raw_arr), 10)
960
961 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000962 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000963 arr1 = self.Array('i', range(10))
964 lock1 = arr1.get_lock()
965 obj1 = arr1.get_obj()
966
967 arr2 = self.Array('i', range(10), lock=None)
968 lock2 = arr2.get_lock()
969 obj2 = arr2.get_obj()
970
971 lock = self.Lock()
972 arr3 = self.Array('i', range(10), lock=lock)
973 lock3 = arr3.get_lock()
974 obj3 = arr3.get_obj()
975 self.assertEqual(lock, lock3)
976
Jesse Noller6ab22152009-01-18 02:45:38 +0000977 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000978 self.assertFalse(hasattr(arr4, 'get_lock'))
979 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000980 self.assertRaises(AttributeError,
981 self.Array, 'i', range(10), lock='notalock')
982
983 arr5 = self.RawArray('i', range(10))
984 self.assertFalse(hasattr(arr5, 'get_lock'))
985 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000986
987#
988#
989#
990
991class _TestContainers(BaseTestCase):
992
993 ALLOWED_TYPES = ('manager',)
994
995 def test_list(self):
996 a = self.list(range(10))
997 self.assertEqual(a[:], range(10))
998
999 b = self.list()
1000 self.assertEqual(b[:], [])
1001
1002 b.extend(range(5))
1003 self.assertEqual(b[:], range(5))
1004
1005 self.assertEqual(b[2], 2)
1006 self.assertEqual(b[2:10], [2,3,4])
1007
1008 b *= 2
1009 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1010
1011 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1012
1013 self.assertEqual(a[:], range(10))
1014
1015 d = [a, b]
1016 e = self.list(d)
1017 self.assertEqual(
1018 e[:],
1019 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1020 )
1021
1022 f = self.list([a])
1023 a.append('hello')
1024 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1025
1026 def test_dict(self):
1027 d = self.dict()
1028 indices = range(65, 70)
1029 for i in indices:
1030 d[i] = chr(i)
1031 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1032 self.assertEqual(sorted(d.keys()), indices)
1033 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1034 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1035
1036 def test_namespace(self):
1037 n = self.Namespace()
1038 n.name = 'Bob'
1039 n.job = 'Builder'
1040 n._hidden = 'hidden'
1041 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1042 del n.job
1043 self.assertEqual(str(n), "Namespace(name='Bob')")
1044 self.assertTrue(hasattr(n, 'name'))
1045 self.assertTrue(not hasattr(n, 'job'))
1046
1047#
1048#
1049#
1050
1051def sqr(x, wait=0.0):
1052 time.sleep(wait)
1053 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001054class _TestPool(BaseTestCase):
1055
1056 def test_apply(self):
1057 papply = self.pool.apply
1058 self.assertEqual(papply(sqr, (5,)), sqr(5))
1059 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1060
1061 def test_map(self):
1062 pmap = self.pool.map
1063 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1064 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1065 map(sqr, range(100)))
1066
Jesse Noller7530e472009-07-16 14:23:04 +00001067 def test_map_chunksize(self):
1068 try:
1069 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1070 except multiprocessing.TimeoutError:
1071 self.fail("pool.map_async with chunksize stalled on null list")
1072
Benjamin Petersondfd79492008-06-13 19:13:39 +00001073 def test_async(self):
1074 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1075 get = TimingWrapper(res.get)
1076 self.assertEqual(get(), 49)
1077 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1078
1079 def test_async_timeout(self):
1080 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1081 get = TimingWrapper(res.get)
1082 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1083 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1084
1085 def test_imap(self):
1086 it = self.pool.imap(sqr, range(10))
1087 self.assertEqual(list(it), map(sqr, range(10)))
1088
1089 it = self.pool.imap(sqr, range(10))
1090 for i in range(10):
1091 self.assertEqual(it.next(), i*i)
1092 self.assertRaises(StopIteration, it.next)
1093
1094 it = self.pool.imap(sqr, range(1000), chunksize=100)
1095 for i in range(1000):
1096 self.assertEqual(it.next(), i*i)
1097 self.assertRaises(StopIteration, it.next)
1098
1099 def test_imap_unordered(self):
1100 it = self.pool.imap_unordered(sqr, range(1000))
1101 self.assertEqual(sorted(it), map(sqr, range(1000)))
1102
1103 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1104 self.assertEqual(sorted(it), map(sqr, range(1000)))
1105
1106 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001107 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1108 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1109
Benjamin Petersondfd79492008-06-13 19:13:39 +00001110 p = multiprocessing.Pool(3)
1111 self.assertEqual(3, len(p._pool))
1112 p.close()
1113 p.join()
1114
1115 def test_terminate(self):
1116 if self.TYPE == 'manager':
1117 # On Unix a forked process increfs each shared object to
1118 # which its parent process held a reference. If the
1119 # forked process gets terminated then there is likely to
1120 # be a reference leak. So to prevent
1121 # _TestZZZNumberOfObjects from failing we skip this test
1122 # when using a manager.
1123 return
1124
1125 result = self.pool.map_async(
1126 time.sleep, [0.1 for i in range(10000)], chunksize=1
1127 )
1128 self.pool.terminate()
1129 join = TimingWrapper(self.pool.join)
1130 join()
1131 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001132
1133class _TestPoolWorkerLifetime(BaseTestCase):
1134
1135 ALLOWED_TYPES = ('processes', )
1136 def test_pool_worker_lifetime(self):
1137 p = multiprocessing.Pool(3, maxtasksperchild=10)
1138 self.assertEqual(3, len(p._pool))
1139 origworkerpids = [w.pid for w in p._pool]
1140 # Run many tasks so each worker gets replaced (hopefully)
1141 results = []
1142 for i in range(100):
1143 results.append(p.apply_async(sqr, (i, )))
1144 # Fetch the results and verify we got the right answers,
1145 # also ensuring all the tasks have completed.
1146 for (j, res) in enumerate(results):
1147 self.assertEqual(res.get(), sqr(j))
1148 # Refill the pool
1149 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001150 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001151 # (countdown * DELTA = 5 seconds max startup process time)
1152 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001153 while countdown and not all(w.is_alive() for w in p._pool):
1154 countdown -= 1
1155 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001156 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001157 # All pids should be assigned. See issue #7805.
1158 self.assertNotIn(None, origworkerpids)
1159 self.assertNotIn(None, finalworkerpids)
1160 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001161 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1162 p.close()
1163 p.join()
1164
Benjamin Petersondfd79492008-06-13 19:13:39 +00001165#
1166# Test that manager has expected number of shared objects left
1167#
1168
1169class _TestZZZNumberOfObjects(BaseTestCase):
1170 # Because test cases are sorted alphabetically, this one will get
1171 # run after all the other tests for the manager. It tests that
1172 # there have been no "reference leaks" for the manager's shared
1173 # objects. Note the comment in _TestPool.test_terminate().
1174 ALLOWED_TYPES = ('manager',)
1175
1176 def test_number_of_objects(self):
1177 EXPECTED_NUMBER = 1 # the pool object is still alive
1178 multiprocessing.active_children() # discard dead process objs
1179 gc.collect() # do garbage collection
1180 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001181 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001182 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001183 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001184 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001185
1186 self.assertEqual(refs, EXPECTED_NUMBER)
1187
1188#
1189# Test of creating a customized manager class
1190#
1191
1192from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1193
1194class FooBar(object):
1195 def f(self):
1196 return 'f()'
1197 def g(self):
1198 raise ValueError
1199 def _h(self):
1200 return '_h()'
1201
1202def baz():
1203 for i in xrange(10):
1204 yield i*i
1205
1206class IteratorProxy(BaseProxy):
1207 _exposed_ = ('next', '__next__')
1208 def __iter__(self):
1209 return self
1210 def next(self):
1211 return self._callmethod('next')
1212 def __next__(self):
1213 return self._callmethod('__next__')
1214
1215class MyManager(BaseManager):
1216 pass
1217
1218MyManager.register('Foo', callable=FooBar)
1219MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1220MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1221
1222
1223class _TestMyManager(BaseTestCase):
1224
1225 ALLOWED_TYPES = ('manager',)
1226
1227 def test_mymanager(self):
1228 manager = MyManager()
1229 manager.start()
1230
1231 foo = manager.Foo()
1232 bar = manager.Bar()
1233 baz = manager.baz()
1234
1235 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1236 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1237
1238 self.assertEqual(foo_methods, ['f', 'g'])
1239 self.assertEqual(bar_methods, ['f', '_h'])
1240
1241 self.assertEqual(foo.f(), 'f()')
1242 self.assertRaises(ValueError, foo.g)
1243 self.assertEqual(foo._callmethod('f'), 'f()')
1244 self.assertRaises(RemoteError, foo._callmethod, '_h')
1245
1246 self.assertEqual(bar.f(), 'f()')
1247 self.assertEqual(bar._h(), '_h()')
1248 self.assertEqual(bar._callmethod('f'), 'f()')
1249 self.assertEqual(bar._callmethod('_h'), '_h()')
1250
1251 self.assertEqual(list(baz), [i*i for i in range(10)])
1252
1253 manager.shutdown()
1254
1255#
1256# Test of connecting to a remote server and using xmlrpclib for serialization
1257#
1258
1259_queue = Queue.Queue()
1260def get_queue():
1261 return _queue
1262
1263class QueueManager(BaseManager):
1264 '''manager class used by server process'''
1265QueueManager.register('get_queue', callable=get_queue)
1266
1267class QueueManager2(BaseManager):
1268 '''manager class which specifies the same interface as QueueManager'''
1269QueueManager2.register('get_queue')
1270
1271
1272SERIALIZER = 'xmlrpclib'
1273
1274class _TestRemoteManager(BaseTestCase):
1275
1276 ALLOWED_TYPES = ('manager',)
1277
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001278 @classmethod
1279 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001280 manager = QueueManager2(
1281 address=address, authkey=authkey, serializer=SERIALIZER
1282 )
1283 manager.connect()
1284 queue = manager.get_queue()
1285 queue.put(('hello world', None, True, 2.25))
1286
1287 def test_remote(self):
1288 authkey = os.urandom(32)
1289
1290 manager = QueueManager(
1291 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1292 )
1293 manager.start()
1294
1295 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001296 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001297 p.start()
1298
1299 manager2 = QueueManager2(
1300 address=manager.address, authkey=authkey, serializer=SERIALIZER
1301 )
1302 manager2.connect()
1303 queue = manager2.get_queue()
1304
1305 # Note that xmlrpclib will deserialize object as a list not a tuple
1306 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1307
1308 # Because we are using xmlrpclib for serialization instead of
1309 # pickle this will cause a serialization error.
1310 self.assertRaises(Exception, queue.put, time.sleep)
1311
1312 # Make queue finalizer run before the server is stopped
1313 del queue
1314 manager.shutdown()
1315
Jesse Noller459a6482009-03-30 15:50:42 +00001316class _TestManagerRestart(BaseTestCase):
1317
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001318 @classmethod
1319 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001320 manager = QueueManager(
1321 address=address, authkey=authkey, serializer=SERIALIZER)
1322 manager.connect()
1323 queue = manager.get_queue()
1324 queue.put('hello world')
1325
1326 def test_rapid_restart(self):
1327 authkey = os.urandom(32)
1328 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001329 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001330 srvr = manager.get_server()
1331 addr = srvr.address
1332 # Close the connection.Listener socket which gets opened as a part
1333 # of manager.get_server(). It's not needed for the test.
1334 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001335 manager.start()
1336
1337 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001338 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001339 p.start()
1340 queue = manager.get_queue()
1341 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001342 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001343 manager.shutdown()
1344 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001345 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001346 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001347 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001348
Benjamin Petersondfd79492008-06-13 19:13:39 +00001349#
1350#
1351#
1352
1353SENTINEL = latin('')
1354
1355class _TestConnection(BaseTestCase):
1356
1357 ALLOWED_TYPES = ('processes', 'threads')
1358
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001359 @classmethod
1360 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001361 for msg in iter(conn.recv_bytes, SENTINEL):
1362 conn.send_bytes(msg)
1363 conn.close()
1364
1365 def test_connection(self):
1366 conn, child_conn = self.Pipe()
1367
1368 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001369 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001370 p.start()
1371
1372 seq = [1, 2.25, None]
1373 msg = latin('hello world')
1374 longmsg = msg * 10
1375 arr = array.array('i', range(4))
1376
1377 if self.TYPE == 'processes':
1378 self.assertEqual(type(conn.fileno()), int)
1379
1380 self.assertEqual(conn.send(seq), None)
1381 self.assertEqual(conn.recv(), seq)
1382
1383 self.assertEqual(conn.send_bytes(msg), None)
1384 self.assertEqual(conn.recv_bytes(), msg)
1385
1386 if self.TYPE == 'processes':
1387 buffer = array.array('i', [0]*10)
1388 expected = list(arr) + [0] * (10 - len(arr))
1389 self.assertEqual(conn.send_bytes(arr), None)
1390 self.assertEqual(conn.recv_bytes_into(buffer),
1391 len(arr) * buffer.itemsize)
1392 self.assertEqual(list(buffer), expected)
1393
1394 buffer = array.array('i', [0]*10)
1395 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1396 self.assertEqual(conn.send_bytes(arr), None)
1397 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1398 len(arr) * buffer.itemsize)
1399 self.assertEqual(list(buffer), expected)
1400
1401 buffer = bytearray(latin(' ' * 40))
1402 self.assertEqual(conn.send_bytes(longmsg), None)
1403 try:
1404 res = conn.recv_bytes_into(buffer)
1405 except multiprocessing.BufferTooShort, e:
1406 self.assertEqual(e.args, (longmsg,))
1407 else:
1408 self.fail('expected BufferTooShort, got %s' % res)
1409
1410 poll = TimingWrapper(conn.poll)
1411
1412 self.assertEqual(poll(), False)
1413 self.assertTimingAlmostEqual(poll.elapsed, 0)
1414
1415 self.assertEqual(poll(TIMEOUT1), False)
1416 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1417
1418 conn.send(None)
1419
1420 self.assertEqual(poll(TIMEOUT1), True)
1421 self.assertTimingAlmostEqual(poll.elapsed, 0)
1422
1423 self.assertEqual(conn.recv(), None)
1424
1425 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1426 conn.send_bytes(really_big_msg)
1427 self.assertEqual(conn.recv_bytes(), really_big_msg)
1428
1429 conn.send_bytes(SENTINEL) # tell child to quit
1430 child_conn.close()
1431
1432 if self.TYPE == 'processes':
1433 self.assertEqual(conn.readable, True)
1434 self.assertEqual(conn.writable, True)
1435 self.assertRaises(EOFError, conn.recv)
1436 self.assertRaises(EOFError, conn.recv_bytes)
1437
1438 p.join()
1439
1440 def test_duplex_false(self):
1441 reader, writer = self.Pipe(duplex=False)
1442 self.assertEqual(writer.send(1), None)
1443 self.assertEqual(reader.recv(), 1)
1444 if self.TYPE == 'processes':
1445 self.assertEqual(reader.readable, True)
1446 self.assertEqual(reader.writable, False)
1447 self.assertEqual(writer.readable, False)
1448 self.assertEqual(writer.writable, True)
1449 self.assertRaises(IOError, reader.send, 2)
1450 self.assertRaises(IOError, writer.recv)
1451 self.assertRaises(IOError, writer.poll)
1452
1453 def test_spawn_close(self):
1454 # We test that a pipe connection can be closed by parent
1455 # process immediately after child is spawned. On Windows this
1456 # would have sometimes failed on old versions because
1457 # child_conn would be closed before the child got a chance to
1458 # duplicate it.
1459 conn, child_conn = self.Pipe()
1460
1461 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001462 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001463 p.start()
1464 child_conn.close() # this might complete before child initializes
1465
1466 msg = latin('hello')
1467 conn.send_bytes(msg)
1468 self.assertEqual(conn.recv_bytes(), msg)
1469
1470 conn.send_bytes(SENTINEL)
1471 conn.close()
1472 p.join()
1473
1474 def test_sendbytes(self):
1475 if self.TYPE != 'processes':
1476 return
1477
1478 msg = latin('abcdefghijklmnopqrstuvwxyz')
1479 a, b = self.Pipe()
1480
1481 a.send_bytes(msg)
1482 self.assertEqual(b.recv_bytes(), msg)
1483
1484 a.send_bytes(msg, 5)
1485 self.assertEqual(b.recv_bytes(), msg[5:])
1486
1487 a.send_bytes(msg, 7, 8)
1488 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1489
1490 a.send_bytes(msg, 26)
1491 self.assertEqual(b.recv_bytes(), latin(''))
1492
1493 a.send_bytes(msg, 26, 0)
1494 self.assertEqual(b.recv_bytes(), latin(''))
1495
1496 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1497
1498 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1499
1500 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1501
1502 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1503
1504 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1505
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001506 @classmethod
1507 def _is_fd_assigned(cls, fd):
1508 try:
1509 os.fstat(fd)
1510 except OSError as e:
1511 if e.errno == errno.EBADF:
1512 return False
1513 raise
1514 else:
1515 return True
1516
1517 @classmethod
1518 def _writefd(cls, conn, data, create_dummy_fds=False):
1519 if create_dummy_fds:
1520 for i in range(0, 256):
1521 if not cls._is_fd_assigned(i):
1522 os.dup2(conn.fileno(), i)
1523 fd = reduction.recv_handle(conn)
1524 if msvcrt:
1525 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1526 os.write(fd, data)
1527 os.close(fd)
1528
1529 def test_fd_transfer(self):
1530 if self.TYPE != 'processes':
1531 self.skipTest("only makes sense with processes")
1532 conn, child_conn = self.Pipe(duplex=True)
1533
1534 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001535 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001536 p.start()
1537 with open(test_support.TESTFN, "wb") as f:
1538 fd = f.fileno()
1539 if msvcrt:
1540 fd = msvcrt.get_osfhandle(fd)
1541 reduction.send_handle(conn, fd, p.pid)
1542 p.join()
1543 with open(test_support.TESTFN, "rb") as f:
1544 self.assertEqual(f.read(), b"foo")
1545
1546 @unittest.skipIf(sys.platform == "win32",
1547 "test semantics don't make sense on Windows")
1548 @unittest.skipIf(MAXFD <= 256,
1549 "largest assignable fd number is too small")
1550 @unittest.skipUnless(hasattr(os, "dup2"),
1551 "test needs os.dup2()")
1552 def test_large_fd_transfer(self):
1553 # With fd > 256 (issue #11657)
1554 if self.TYPE != 'processes':
1555 self.skipTest("only makes sense with processes")
1556 conn, child_conn = self.Pipe(duplex=True)
1557
1558 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001559 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001560 p.start()
1561 with open(test_support.TESTFN, "wb") as f:
1562 fd = f.fileno()
1563 for newfd in range(256, MAXFD):
1564 if not self._is_fd_assigned(newfd):
1565 break
1566 else:
1567 self.fail("could not find an unassigned large file descriptor")
1568 os.dup2(fd, newfd)
1569 try:
1570 reduction.send_handle(conn, newfd, p.pid)
1571 finally:
1572 os.close(newfd)
1573 p.join()
1574 with open(test_support.TESTFN, "rb") as f:
1575 self.assertEqual(f.read(), b"bar")
1576
1577
Benjamin Petersondfd79492008-06-13 19:13:39 +00001578class _TestListenerClient(BaseTestCase):
1579
1580 ALLOWED_TYPES = ('processes', 'threads')
1581
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001582 @classmethod
1583 def _test(cls, address):
1584 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001585 conn.send('hello')
1586 conn.close()
1587
1588 def test_listener_client(self):
1589 for family in self.connection.families:
1590 l = self.connection.Listener(family=family)
1591 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001592 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001593 p.start()
1594 conn = l.accept()
1595 self.assertEqual(conn.recv(), 'hello')
1596 p.join()
1597 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001598#
1599# Test of sending connection and socket objects between processes
1600#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001601"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001602class _TestPicklingConnections(BaseTestCase):
1603
1604 ALLOWED_TYPES = ('processes',)
1605
1606 def _listener(self, conn, families):
1607 for fam in families:
1608 l = self.connection.Listener(family=fam)
1609 conn.send(l.address)
1610 new_conn = l.accept()
1611 conn.send(new_conn)
1612
1613 if self.TYPE == 'processes':
1614 l = socket.socket()
1615 l.bind(('localhost', 0))
1616 conn.send(l.getsockname())
1617 l.listen(1)
1618 new_conn, addr = l.accept()
1619 conn.send(new_conn)
1620
1621 conn.recv()
1622
1623 def _remote(self, conn):
1624 for (address, msg) in iter(conn.recv, None):
1625 client = self.connection.Client(address)
1626 client.send(msg.upper())
1627 client.close()
1628
1629 if self.TYPE == 'processes':
1630 address, msg = conn.recv()
1631 client = socket.socket()
1632 client.connect(address)
1633 client.sendall(msg.upper())
1634 client.close()
1635
1636 conn.close()
1637
1638 def test_pickling(self):
1639 try:
1640 multiprocessing.allow_connection_pickling()
1641 except ImportError:
1642 return
1643
1644 families = self.connection.families
1645
1646 lconn, lconn0 = self.Pipe()
1647 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001648 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001649 lp.start()
1650 lconn0.close()
1651
1652 rconn, rconn0 = self.Pipe()
1653 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001654 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001655 rp.start()
1656 rconn0.close()
1657
1658 for fam in families:
1659 msg = ('This connection uses family %s' % fam).encode('ascii')
1660 address = lconn.recv()
1661 rconn.send((address, msg))
1662 new_conn = lconn.recv()
1663 self.assertEqual(new_conn.recv(), msg.upper())
1664
1665 rconn.send(None)
1666
1667 if self.TYPE == 'processes':
1668 msg = latin('This connection uses a normal socket')
1669 address = lconn.recv()
1670 rconn.send((address, msg))
1671 if hasattr(socket, 'fromfd'):
1672 new_conn = lconn.recv()
1673 self.assertEqual(new_conn.recv(100), msg.upper())
1674 else:
1675 # XXX On Windows with Py2.6 need to backport fromfd()
1676 discard = lconn.recv_bytes()
1677
1678 lconn.send(None)
1679
1680 rconn.close()
1681 lconn.close()
1682
1683 lp.join()
1684 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001685"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001686#
1687#
1688#
1689
1690class _TestHeap(BaseTestCase):
1691
1692 ALLOWED_TYPES = ('processes',)
1693
1694 def test_heap(self):
1695 iterations = 5000
1696 maxblocks = 50
1697 blocks = []
1698
1699 # create and destroy lots of blocks of different sizes
1700 for i in xrange(iterations):
1701 size = int(random.lognormvariate(0, 1) * 1000)
1702 b = multiprocessing.heap.BufferWrapper(size)
1703 blocks.append(b)
1704 if len(blocks) > maxblocks:
1705 i = random.randrange(maxblocks)
1706 del blocks[i]
1707
1708 # get the heap object
1709 heap = multiprocessing.heap.BufferWrapper._heap
1710
1711 # verify the state of the heap
1712 all = []
1713 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001714 heap._lock.acquire()
1715 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001716 for L in heap._len_to_seq.values():
1717 for arena, start, stop in L:
1718 all.append((heap._arenas.index(arena), start, stop,
1719 stop-start, 'free'))
1720 for arena, start, stop in heap._allocated_blocks:
1721 all.append((heap._arenas.index(arena), start, stop,
1722 stop-start, 'occupied'))
1723 occupied += (stop-start)
1724
1725 all.sort()
1726
1727 for i in range(len(all)-1):
1728 (arena, start, stop) = all[i][:3]
1729 (narena, nstart, nstop) = all[i+1][:3]
1730 self.assertTrue((arena != narena and nstart == 0) or
1731 (stop == nstart))
1732
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001733 def test_free_from_gc(self):
1734 # Check that freeing of blocks by the garbage collector doesn't deadlock
1735 # (issue #12352).
1736 # Make sure the GC is enabled, and set lower collection thresholds to
1737 # make collections more frequent (and increase the probability of
1738 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001739 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001740 gc.enable()
1741 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001742 thresholds = gc.get_threshold()
1743 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001744 gc.set_threshold(10)
1745
1746 # perform numerous block allocations, with cyclic references to make
1747 # sure objects are collected asynchronously by the gc
1748 for i in range(5000):
1749 a = multiprocessing.heap.BufferWrapper(1)
1750 b = multiprocessing.heap.BufferWrapper(1)
1751 # circular references
1752 a.buddy = b
1753 b.buddy = a
1754
Benjamin Petersondfd79492008-06-13 19:13:39 +00001755#
1756#
1757#
1758
Benjamin Petersondfd79492008-06-13 19:13:39 +00001759class _Foo(Structure):
1760 _fields_ = [
1761 ('x', c_int),
1762 ('y', c_double)
1763 ]
1764
1765class _TestSharedCTypes(BaseTestCase):
1766
1767 ALLOWED_TYPES = ('processes',)
1768
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001769 def setUp(self):
1770 if not HAS_SHAREDCTYPES:
1771 self.skipTest("requires multiprocessing.sharedctypes")
1772
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001773 @classmethod
1774 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001775 x.value *= 2
1776 y.value *= 2
1777 foo.x *= 2
1778 foo.y *= 2
1779 string.value *= 2
1780 for i in range(len(arr)):
1781 arr[i] *= 2
1782
1783 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001784 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001785 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001786 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001787 arr = self.Array('d', range(10), lock=lock)
1788 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001789 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001790
1791 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001792 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001793 p.start()
1794 p.join()
1795
1796 self.assertEqual(x.value, 14)
1797 self.assertAlmostEqual(y.value, 2.0/3.0)
1798 self.assertEqual(foo.x, 6)
1799 self.assertAlmostEqual(foo.y, 4.0)
1800 for i in range(10):
1801 self.assertAlmostEqual(arr[i], i*2)
1802 self.assertEqual(string.value, latin('hellohello'))
1803
1804 def test_synchronize(self):
1805 self.test_sharedctypes(lock=True)
1806
1807 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001808 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001809 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001810 foo.x = 0
1811 foo.y = 0
1812 self.assertEqual(bar.x, 2)
1813 self.assertAlmostEqual(bar.y, 5.0)
1814
1815#
1816#
1817#
1818
1819class _TestFinalize(BaseTestCase):
1820
1821 ALLOWED_TYPES = ('processes',)
1822
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001823 @classmethod
1824 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001825 class Foo(object):
1826 pass
1827
1828 a = Foo()
1829 util.Finalize(a, conn.send, args=('a',))
1830 del a # triggers callback for a
1831
1832 b = Foo()
1833 close_b = util.Finalize(b, conn.send, args=('b',))
1834 close_b() # triggers callback for b
1835 close_b() # does nothing because callback has already been called
1836 del b # does nothing because callback has already been called
1837
1838 c = Foo()
1839 util.Finalize(c, conn.send, args=('c',))
1840
1841 d10 = Foo()
1842 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1843
1844 d01 = Foo()
1845 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1846 d02 = Foo()
1847 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1848 d03 = Foo()
1849 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1850
1851 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1852
1853 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1854
Ezio Melottic2077b02011-03-16 12:34:31 +02001855 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001856 # garbage collecting locals
1857 util._exit_function()
1858 conn.close()
1859 os._exit(0)
1860
1861 def test_finalize(self):
1862 conn, child_conn = self.Pipe()
1863
1864 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001865 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001866 p.start()
1867 p.join()
1868
1869 result = [obj for obj in iter(conn.recv, 'STOP')]
1870 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1871
1872#
1873# Test that from ... import * works for each module
1874#
1875
1876class _TestImportStar(BaseTestCase):
1877
1878 ALLOWED_TYPES = ('processes',)
1879
1880 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001881 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001882 'multiprocessing', 'multiprocessing.connection',
1883 'multiprocessing.heap', 'multiprocessing.managers',
1884 'multiprocessing.pool', 'multiprocessing.process',
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001885 'multiprocessing.reduction',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001886 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001887 ]
1888
1889 if c_int is not None:
1890 # This module requires _ctypes
1891 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001892
1893 for name in modules:
1894 __import__(name)
1895 mod = sys.modules[name]
1896
1897 for attr in getattr(mod, '__all__', ()):
1898 self.assertTrue(
1899 hasattr(mod, attr),
1900 '%r does not have attribute %r' % (mod, attr)
1901 )
1902
1903#
1904# Quick test that logging works -- does not test logging output
1905#
1906
1907class _TestLogging(BaseTestCase):
1908
1909 ALLOWED_TYPES = ('processes',)
1910
1911 def test_enable_logging(self):
1912 logger = multiprocessing.get_logger()
1913 logger.setLevel(util.SUBWARNING)
1914 self.assertTrue(logger is not None)
1915 logger.debug('this will not be printed')
1916 logger.info('nor will this')
1917 logger.setLevel(LOG_LEVEL)
1918
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001919 @classmethod
1920 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001921 logger = multiprocessing.get_logger()
1922 conn.send(logger.getEffectiveLevel())
1923
1924 def test_level(self):
1925 LEVEL1 = 32
1926 LEVEL2 = 37
1927
1928 logger = multiprocessing.get_logger()
1929 root_logger = logging.getLogger()
1930 root_level = root_logger.level
1931
1932 reader, writer = multiprocessing.Pipe(duplex=False)
1933
1934 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001935 p = self.Process(target=self._test_level, args=(writer,))
1936 p.daemon = True
1937 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001938 self.assertEqual(LEVEL1, reader.recv())
1939
1940 logger.setLevel(logging.NOTSET)
1941 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001942 p = self.Process(target=self._test_level, args=(writer,))
1943 p.daemon = True
1944 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001945 self.assertEqual(LEVEL2, reader.recv())
1946
1947 root_logger.setLevel(root_level)
1948 logger.setLevel(level=LOG_LEVEL)
1949
Jesse Noller814d02d2009-11-21 14:38:23 +00001950
Jesse Noller9a03f2f2009-11-24 14:17:29 +00001951# class _TestLoggingProcessName(BaseTestCase):
1952#
1953# def handle(self, record):
1954# assert record.processName == multiprocessing.current_process().name
1955# self.__handled = True
1956#
1957# def test_logging(self):
1958# handler = logging.Handler()
1959# handler.handle = self.handle
1960# self.__handled = False
1961# # Bypass getLogger() and side-effects
1962# logger = logging.getLoggerClass()(
1963# 'multiprocessing.test.TestLoggingProcessName')
1964# logger.addHandler(handler)
1965# logger.propagate = False
1966#
1967# logger.warn('foo')
1968# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00001969
Benjamin Petersondfd79492008-06-13 19:13:39 +00001970#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001971# Test to verify handle verification, see issue 3321
1972#
1973
1974class TestInvalidHandle(unittest.TestCase):
1975
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001976 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001977 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001978 conn = _multiprocessing.Connection(44977608)
1979 self.assertRaises(IOError, conn.poll)
1980 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001981
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001982#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001983# Functions used to create test cases from the base ones in this module
1984#
1985
1986def get_attributes(Source, names):
1987 d = {}
1988 for name in names:
1989 obj = getattr(Source, name)
1990 if type(obj) == type(get_attributes):
1991 obj = staticmethod(obj)
1992 d[name] = obj
1993 return d
1994
1995def create_test_cases(Mixin, type):
1996 result = {}
1997 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001998 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001999
2000 for name in glob.keys():
2001 if name.startswith('_Test'):
2002 base = glob[name]
2003 if type in base.ALLOWED_TYPES:
2004 newname = 'With' + Type + name[1:]
2005 class Temp(base, unittest.TestCase, Mixin):
2006 pass
2007 result[newname] = Temp
2008 Temp.__name__ = newname
2009 Temp.__module__ = Mixin.__module__
2010 return result
2011
2012#
2013# Create test cases
2014#
2015
2016class ProcessesMixin(object):
2017 TYPE = 'processes'
2018 Process = multiprocessing.Process
2019 locals().update(get_attributes(multiprocessing, (
2020 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2021 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2022 'RawArray', 'current_process', 'active_children', 'Pipe',
2023 'connection', 'JoinableQueue'
2024 )))
2025
2026testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2027globals().update(testcases_processes)
2028
2029
2030class ManagerMixin(object):
2031 TYPE = 'manager'
2032 Process = multiprocessing.Process
2033 manager = object.__new__(multiprocessing.managers.SyncManager)
2034 locals().update(get_attributes(manager, (
2035 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2036 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2037 'Namespace', 'JoinableQueue'
2038 )))
2039
2040testcases_manager = create_test_cases(ManagerMixin, type='manager')
2041globals().update(testcases_manager)
2042
2043
2044class ThreadsMixin(object):
2045 TYPE = 'threads'
2046 Process = multiprocessing.dummy.Process
2047 locals().update(get_attributes(multiprocessing.dummy, (
2048 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2049 'Condition', 'Event', 'Value', 'Array', 'current_process',
2050 'active_children', 'Pipe', 'connection', 'dict', 'list',
2051 'Namespace', 'JoinableQueue'
2052 )))
2053
2054testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2055globals().update(testcases_threads)
2056
Neal Norwitz0c519b32008-08-25 01:50:24 +00002057class OtherTest(unittest.TestCase):
2058 # TODO: add more tests for deliver/answer challenge.
2059 def test_deliver_challenge_auth_failure(self):
2060 class _FakeConnection(object):
2061 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002062 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002063 def send_bytes(self, data):
2064 pass
2065 self.assertRaises(multiprocessing.AuthenticationError,
2066 multiprocessing.connection.deliver_challenge,
2067 _FakeConnection(), b'abc')
2068
2069 def test_answer_challenge_auth_failure(self):
2070 class _FakeConnection(object):
2071 def __init__(self):
2072 self.count = 0
2073 def recv_bytes(self, size):
2074 self.count += 1
2075 if self.count == 1:
2076 return multiprocessing.connection.CHALLENGE
2077 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002078 return b'something bogus'
2079 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002080 def send_bytes(self, data):
2081 pass
2082 self.assertRaises(multiprocessing.AuthenticationError,
2083 multiprocessing.connection.answer_challenge,
2084 _FakeConnection(), b'abc')
2085
Jesse Noller7152f6d2009-04-02 05:17:26 +00002086#
2087# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2088#
2089
2090def initializer(ns):
2091 ns.test += 1
2092
2093class TestInitializers(unittest.TestCase):
2094 def setUp(self):
2095 self.mgr = multiprocessing.Manager()
2096 self.ns = self.mgr.Namespace()
2097 self.ns.test = 0
2098
2099 def tearDown(self):
2100 self.mgr.shutdown()
2101
2102 def test_manager_initializer(self):
2103 m = multiprocessing.managers.SyncManager()
2104 self.assertRaises(TypeError, m.start, 1)
2105 m.start(initializer, (self.ns,))
2106 self.assertEqual(self.ns.test, 1)
2107 m.shutdown()
2108
2109 def test_pool_initializer(self):
2110 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2111 p = multiprocessing.Pool(1, initializer, (self.ns,))
2112 p.close()
2113 p.join()
2114 self.assertEqual(self.ns.test, 1)
2115
Jesse Noller1b90efb2009-06-30 17:11:52 +00002116#
2117# Issue 5155, 5313, 5331: Test process in processes
2118# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2119#
2120
2121def _ThisSubProcess(q):
2122 try:
2123 item = q.get(block=False)
2124 except Queue.Empty:
2125 pass
2126
2127def _TestProcess(q):
2128 queue = multiprocessing.Queue()
2129 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002130 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002131 subProc.start()
2132 subProc.join()
2133
2134def _afunc(x):
2135 return x*x
2136
2137def pool_in_process():
2138 pool = multiprocessing.Pool(processes=4)
2139 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2140
2141class _file_like(object):
2142 def __init__(self, delegate):
2143 self._delegate = delegate
2144 self._pid = None
2145
2146 @property
2147 def cache(self):
2148 pid = os.getpid()
2149 # There are no race conditions since fork keeps only the running thread
2150 if pid != self._pid:
2151 self._pid = pid
2152 self._cache = []
2153 return self._cache
2154
2155 def write(self, data):
2156 self.cache.append(data)
2157
2158 def flush(self):
2159 self._delegate.write(''.join(self.cache))
2160 self._cache = []
2161
2162class TestStdinBadfiledescriptor(unittest.TestCase):
2163
2164 def test_queue_in_process(self):
2165 queue = multiprocessing.Queue()
2166 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002167 proc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002168 proc.start()
2169 proc.join()
2170
2171 def test_pool_in_process(self):
2172 p = multiprocessing.Process(target=pool_in_process)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002173 p.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002174 p.start()
2175 p.join()
2176
2177 def test_flushing(self):
2178 sio = StringIO()
2179 flike = _file_like(sio)
2180 flike.write('foo')
2181 proc = multiprocessing.Process(target=lambda: flike.flush())
2182 flike.flush()
2183 assert sio.getvalue() == 'foo'
2184
2185testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2186 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002187
Benjamin Petersondfd79492008-06-13 19:13:39 +00002188#
2189#
2190#
2191
2192def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002193 if sys.platform.startswith("linux"):
2194 try:
2195 lock = multiprocessing.RLock()
2196 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002197 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002198
Benjamin Petersondfd79492008-06-13 19:13:39 +00002199 if run is None:
2200 from test.test_support import run_unittest as run
2201
2202 util.get_temp_dir() # creates temp directory for use by all processes
2203
2204 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2205
Jesse Noller146b7ab2008-07-02 16:44:09 +00002206 ProcessesMixin.pool = multiprocessing.Pool(4)
2207 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2208 ManagerMixin.manager.__init__()
2209 ManagerMixin.manager.start()
2210 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002211
2212 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002213 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2214 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002215 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2216 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002217 )
2218
2219 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2220 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002221 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2222 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002223 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002224 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002225 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002226 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2227 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2228 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002229 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002230
Jesse Noller146b7ab2008-07-02 16:44:09 +00002231 ThreadsMixin.pool.terminate()
2232 ProcessesMixin.pool.terminate()
2233 ManagerMixin.pool.terminate()
2234 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002235
Jesse Noller146b7ab2008-07-02 16:44:09 +00002236 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002237
2238def main():
2239 test_main(unittest.TextTestRunner(verbosity=2).run)
2240
2241if __name__ == '__main__':
2242 main()