blob: bc2c048b681d1c603ab453f548f955a12ca704b8 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
184 return
185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
251 return
252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
Richard Oudkerk2182e052012-06-06 19:01:14 +0100328 @classmethod
329 def _test_sys_exit(cls, reason, testfn):
330 sys.stderr = open(testfn, 'w')
331 sys.exit(reason)
332
333 def test_sys_exit(self):
334 # See Issue 13854
335 if self.TYPE == 'threads':
336 return
337
338 testfn = test_support.TESTFN
339 self.addCleanup(test_support.unlink, testfn)
340
341 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
342 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
343 p.daemon = True
344 p.start()
345 p.join(5)
346 self.assertEqual(p.exitcode, code)
347
348 with open(testfn, 'r') as f:
349 self.assertEqual(f.read().rstrip(), str(reason))
350
351 for reason in (True, False, 8):
352 p = self.Process(target=sys.exit, args=(reason,))
353 p.daemon = True
354 p.start()
355 p.join(5)
356 self.assertEqual(p.exitcode, reason)
357
Benjamin Petersondfd79492008-06-13 19:13:39 +0000358#
359#
360#
361
362class _UpperCaser(multiprocessing.Process):
363
364 def __init__(self):
365 multiprocessing.Process.__init__(self)
366 self.child_conn, self.parent_conn = multiprocessing.Pipe()
367
368 def run(self):
369 self.parent_conn.close()
370 for s in iter(self.child_conn.recv, None):
371 self.child_conn.send(s.upper())
372 self.child_conn.close()
373
374 def submit(self, s):
375 assert type(s) is str
376 self.parent_conn.send(s)
377 return self.parent_conn.recv()
378
379 def stop(self):
380 self.parent_conn.send(None)
381 self.parent_conn.close()
382 self.child_conn.close()
383
384class _TestSubclassingProcess(BaseTestCase):
385
386 ALLOWED_TYPES = ('processes',)
387
388 def test_subclassing(self):
389 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200390 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000391 uppercaser.start()
392 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
393 self.assertEqual(uppercaser.submit('world'), 'WORLD')
394 uppercaser.stop()
395 uppercaser.join()
396
397#
398#
399#
400
401def queue_empty(q):
402 if hasattr(q, 'empty'):
403 return q.empty()
404 else:
405 return q.qsize() == 0
406
407def queue_full(q, maxsize):
408 if hasattr(q, 'full'):
409 return q.full()
410 else:
411 return q.qsize() == maxsize
412
413
414class _TestQueue(BaseTestCase):
415
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
420 for i in range(6):
421 queue.get()
422 parent_can_continue.set()
423
424 def test_put(self):
425 MAXSIZE = 6
426 queue = self.Queue(maxsize=MAXSIZE)
427 child_can_start = self.Event()
428 parent_can_continue = self.Event()
429
430 proc = self.Process(
431 target=self._test_put,
432 args=(queue, child_can_start, parent_can_continue)
433 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000434 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000435 proc.start()
436
437 self.assertEqual(queue_empty(queue), True)
438 self.assertEqual(queue_full(queue, MAXSIZE), False)
439
440 queue.put(1)
441 queue.put(2, True)
442 queue.put(3, True, None)
443 queue.put(4, False)
444 queue.put(5, False, None)
445 queue.put_nowait(6)
446
447 # the values may be in buffer but not yet in pipe so sleep a bit
448 time.sleep(DELTA)
449
450 self.assertEqual(queue_empty(queue), False)
451 self.assertEqual(queue_full(queue, MAXSIZE), True)
452
453 put = TimingWrapper(queue.put)
454 put_nowait = TimingWrapper(queue.put_nowait)
455
456 self.assertRaises(Queue.Full, put, 7, False)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(Queue.Full, put, 7, False, None)
460 self.assertTimingAlmostEqual(put.elapsed, 0)
461
462 self.assertRaises(Queue.Full, put_nowait, 7)
463 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
464
465 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
466 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
467
468 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
469 self.assertTimingAlmostEqual(put.elapsed, 0)
470
471 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
473
474 child_can_start.set()
475 parent_can_continue.wait()
476
477 self.assertEqual(queue_empty(queue), True)
478 self.assertEqual(queue_full(queue, MAXSIZE), False)
479
480 proc.join()
481
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000482 @classmethod
483 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000485 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000486 queue.put(2)
487 queue.put(3)
488 queue.put(4)
489 queue.put(5)
490 parent_can_continue.set()
491
492 def test_get(self):
493 queue = self.Queue()
494 child_can_start = self.Event()
495 parent_can_continue = self.Event()
496
497 proc = self.Process(
498 target=self._test_get,
499 args=(queue, child_can_start, parent_can_continue)
500 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000501 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 proc.start()
503
504 self.assertEqual(queue_empty(queue), True)
505
506 child_can_start.set()
507 parent_can_continue.wait()
508
509 time.sleep(DELTA)
510 self.assertEqual(queue_empty(queue), False)
511
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000512 # Hangs unexpectedly, remove for now
513 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000514 self.assertEqual(queue.get(True, None), 2)
515 self.assertEqual(queue.get(True), 3)
516 self.assertEqual(queue.get(timeout=1), 4)
517 self.assertEqual(queue.get_nowait(), 5)
518
519 self.assertEqual(queue_empty(queue), True)
520
521 get = TimingWrapper(queue.get)
522 get_nowait = TimingWrapper(queue.get_nowait)
523
524 self.assertRaises(Queue.Empty, get, False)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(Queue.Empty, get, False, None)
528 self.assertTimingAlmostEqual(get.elapsed, 0)
529
530 self.assertRaises(Queue.Empty, get_nowait)
531 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
532
533 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
534 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
535
536 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
537 self.assertTimingAlmostEqual(get.elapsed, 0)
538
539 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
541
542 proc.join()
543
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000544 @classmethod
545 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000546 for i in range(10, 20):
547 queue.put(i)
548 # note that at this point the items may only be buffered, so the
549 # process cannot shutdown until the feeder thread has finished
550 # pushing items onto the pipe.
551
552 def test_fork(self):
553 # Old versions of Queue would fail to create a new feeder
554 # thread for a forked process if the original process had its
555 # own feeder thread. This test checks that this no longer
556 # happens.
557
558 queue = self.Queue()
559
560 # put items on queue so that main process starts a feeder thread
561 for i in range(10):
562 queue.put(i)
563
564 # wait to make sure thread starts before we fork a new process
565 time.sleep(DELTA)
566
567 # fork process
568 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200569 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000570 p.start()
571
572 # check that all expected items are in the queue
573 for i in range(20):
574 self.assertEqual(queue.get(), i)
575 self.assertRaises(Queue.Empty, queue.get, False)
576
577 p.join()
578
579 def test_qsize(self):
580 q = self.Queue()
581 try:
582 self.assertEqual(q.qsize(), 0)
583 except NotImplementedError:
584 return
585 q.put(1)
586 self.assertEqual(q.qsize(), 1)
587 q.put(5)
588 self.assertEqual(q.qsize(), 2)
589 q.get()
590 self.assertEqual(q.qsize(), 1)
591 q.get()
592 self.assertEqual(q.qsize(), 0)
593
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000594 @classmethod
595 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000596 for obj in iter(q.get, None):
597 time.sleep(DELTA)
598 q.task_done()
599
600 def test_task_done(self):
601 queue = self.JoinableQueue()
602
603 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000604 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000605
606 workers = [self.Process(target=self._test_task_done, args=(queue,))
607 for i in xrange(4)]
608
609 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200610 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000611 p.start()
612
613 for i in xrange(10):
614 queue.put(i)
615
616 queue.join()
617
618 for p in workers:
619 queue.put(None)
620
621 for p in workers:
622 p.join()
623
624#
625#
626#
627
628class _TestLock(BaseTestCase):
629
630 def test_lock(self):
631 lock = self.Lock()
632 self.assertEqual(lock.acquire(), True)
633 self.assertEqual(lock.acquire(False), False)
634 self.assertEqual(lock.release(), None)
635 self.assertRaises((ValueError, threading.ThreadError), lock.release)
636
637 def test_rlock(self):
638 lock = self.RLock()
639 self.assertEqual(lock.acquire(), True)
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.acquire(), True)
642 self.assertEqual(lock.release(), None)
643 self.assertEqual(lock.release(), None)
644 self.assertEqual(lock.release(), None)
645 self.assertRaises((AssertionError, RuntimeError), lock.release)
646
Jesse Noller82eb5902009-03-30 23:29:31 +0000647 def test_lock_context(self):
648 with self.Lock():
649 pass
650
Benjamin Petersondfd79492008-06-13 19:13:39 +0000651
652class _TestSemaphore(BaseTestCase):
653
654 def _test_semaphore(self, sem):
655 self.assertReturnsIfImplemented(2, get_value, sem)
656 self.assertEqual(sem.acquire(), True)
657 self.assertReturnsIfImplemented(1, get_value, sem)
658 self.assertEqual(sem.acquire(), True)
659 self.assertReturnsIfImplemented(0, get_value, sem)
660 self.assertEqual(sem.acquire(False), False)
661 self.assertReturnsIfImplemented(0, get_value, sem)
662 self.assertEqual(sem.release(), None)
663 self.assertReturnsIfImplemented(1, get_value, sem)
664 self.assertEqual(sem.release(), None)
665 self.assertReturnsIfImplemented(2, get_value, sem)
666
667 def test_semaphore(self):
668 sem = self.Semaphore(2)
669 self._test_semaphore(sem)
670 self.assertEqual(sem.release(), None)
671 self.assertReturnsIfImplemented(3, get_value, sem)
672 self.assertEqual(sem.release(), None)
673 self.assertReturnsIfImplemented(4, get_value, sem)
674
675 def test_bounded_semaphore(self):
676 sem = self.BoundedSemaphore(2)
677 self._test_semaphore(sem)
678 # Currently fails on OS/X
679 #if HAVE_GETVALUE:
680 # self.assertRaises(ValueError, sem.release)
681 # self.assertReturnsIfImplemented(2, get_value, sem)
682
683 def test_timeout(self):
684 if self.TYPE != 'processes':
685 return
686
687 sem = self.Semaphore(0)
688 acquire = TimingWrapper(sem.acquire)
689
690 self.assertEqual(acquire(False), False)
691 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
692
693 self.assertEqual(acquire(False, None), False)
694 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
695
696 self.assertEqual(acquire(False, TIMEOUT1), False)
697 self.assertTimingAlmostEqual(acquire.elapsed, 0)
698
699 self.assertEqual(acquire(True, TIMEOUT2), False)
700 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
701
702 self.assertEqual(acquire(timeout=TIMEOUT3), False)
703 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
704
705
706class _TestCondition(BaseTestCase):
707
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000708 @classmethod
709 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000710 cond.acquire()
711 sleeping.release()
712 cond.wait(timeout)
713 woken.release()
714 cond.release()
715
716 def check_invariant(self, cond):
717 # this is only supposed to succeed when there are no sleepers
718 if self.TYPE == 'processes':
719 try:
720 sleepers = (cond._sleeping_count.get_value() -
721 cond._woken_count.get_value())
722 self.assertEqual(sleepers, 0)
723 self.assertEqual(cond._wait_semaphore.get_value(), 0)
724 except NotImplementedError:
725 pass
726
727 def test_notify(self):
728 cond = self.Condition()
729 sleeping = self.Semaphore(0)
730 woken = self.Semaphore(0)
731
732 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000733 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000734 p.start()
735
736 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000737 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000738 p.start()
739
740 # wait for both children to start sleeping
741 sleeping.acquire()
742 sleeping.acquire()
743
744 # check no process/thread has woken up
745 time.sleep(DELTA)
746 self.assertReturnsIfImplemented(0, get_value, woken)
747
748 # wake up one process/thread
749 cond.acquire()
750 cond.notify()
751 cond.release()
752
753 # check one process/thread has woken up
754 time.sleep(DELTA)
755 self.assertReturnsIfImplemented(1, get_value, woken)
756
757 # wake up another
758 cond.acquire()
759 cond.notify()
760 cond.release()
761
762 # check other has woken up
763 time.sleep(DELTA)
764 self.assertReturnsIfImplemented(2, get_value, woken)
765
766 # check state is not mucked up
767 self.check_invariant(cond)
768 p.join()
769
770 def test_notify_all(self):
771 cond = self.Condition()
772 sleeping = self.Semaphore(0)
773 woken = self.Semaphore(0)
774
775 # start some threads/processes which will timeout
776 for i in range(3):
777 p = self.Process(target=self.f,
778 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000779 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000780 p.start()
781
782 t = threading.Thread(target=self.f,
783 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000784 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000785 t.start()
786
787 # wait for them all to sleep
788 for i in xrange(6):
789 sleeping.acquire()
790
791 # check they have all timed out
792 for i in xrange(6):
793 woken.acquire()
794 self.assertReturnsIfImplemented(0, get_value, woken)
795
796 # check state is not mucked up
797 self.check_invariant(cond)
798
799 # start some more threads/processes
800 for i in range(3):
801 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000802 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000803 p.start()
804
805 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000806 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000807 t.start()
808
809 # wait for them to all sleep
810 for i in xrange(6):
811 sleeping.acquire()
812
813 # check no process/thread has woken up
814 time.sleep(DELTA)
815 self.assertReturnsIfImplemented(0, get_value, woken)
816
817 # wake them all up
818 cond.acquire()
819 cond.notify_all()
820 cond.release()
821
822 # check they have all woken
823 time.sleep(DELTA)
824 self.assertReturnsIfImplemented(6, get_value, woken)
825
826 # check state is not mucked up
827 self.check_invariant(cond)
828
829 def test_timeout(self):
830 cond = self.Condition()
831 wait = TimingWrapper(cond.wait)
832 cond.acquire()
833 res = wait(TIMEOUT1)
834 cond.release()
835 self.assertEqual(res, None)
836 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
837
838
839class _TestEvent(BaseTestCase):
840
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000841 @classmethod
842 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000843 time.sleep(TIMEOUT2)
844 event.set()
845
846 def test_event(self):
847 event = self.Event()
848 wait = TimingWrapper(event.wait)
849
Ezio Melottic2077b02011-03-16 12:34:31 +0200850 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000851 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000852 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000853
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000854 # Removed, threading.Event.wait() will return the value of the __flag
855 # instead of None. API Shear with the semaphore backed mp.Event
856 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000857 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000858 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000859 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
860
861 event.set()
862
863 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000864 self.assertEqual(event.is_set(), True)
865 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000866 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000867 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000868 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
869 # self.assertEqual(event.is_set(), True)
870
871 event.clear()
872
873 #self.assertEqual(event.is_set(), False)
874
Jesus Cea6f6016b2011-09-09 20:26:57 +0200875 p = self.Process(target=self._test_event, args=(event,))
876 p.daemon = True
877 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000878 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000879
880#
881#
882#
883
884class _TestValue(BaseTestCase):
885
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000886 ALLOWED_TYPES = ('processes',)
887
Benjamin Petersondfd79492008-06-13 19:13:39 +0000888 codes_values = [
889 ('i', 4343, 24234),
890 ('d', 3.625, -4.25),
891 ('h', -232, 234),
892 ('c', latin('x'), latin('y'))
893 ]
894
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000895 def setUp(self):
896 if not HAS_SHAREDCTYPES:
897 self.skipTest("requires multiprocessing.sharedctypes")
898
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000899 @classmethod
900 def _test(cls, values):
901 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000902 sv.value = cv[2]
903
904
905 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000906 if raw:
907 values = [self.RawValue(code, value)
908 for code, value, _ in self.codes_values]
909 else:
910 values = [self.Value(code, value)
911 for code, value, _ in self.codes_values]
912
913 for sv, cv in zip(values, self.codes_values):
914 self.assertEqual(sv.value, cv[1])
915
916 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200917 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000918 proc.start()
919 proc.join()
920
921 for sv, cv in zip(values, self.codes_values):
922 self.assertEqual(sv.value, cv[2])
923
924 def test_rawvalue(self):
925 self.test_value(raw=True)
926
927 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000928 val1 = self.Value('i', 5)
929 lock1 = val1.get_lock()
930 obj1 = val1.get_obj()
931
932 val2 = self.Value('i', 5, lock=None)
933 lock2 = val2.get_lock()
934 obj2 = val2.get_obj()
935
936 lock = self.Lock()
937 val3 = self.Value('i', 5, lock=lock)
938 lock3 = val3.get_lock()
939 obj3 = val3.get_obj()
940 self.assertEqual(lock, lock3)
941
Jesse Noller6ab22152009-01-18 02:45:38 +0000942 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000943 self.assertFalse(hasattr(arr4, 'get_lock'))
944 self.assertFalse(hasattr(arr4, 'get_obj'))
945
Jesse Noller6ab22152009-01-18 02:45:38 +0000946 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
947
948 arr5 = self.RawValue('i', 5)
949 self.assertFalse(hasattr(arr5, 'get_lock'))
950 self.assertFalse(hasattr(arr5, 'get_obj'))
951
Benjamin Petersondfd79492008-06-13 19:13:39 +0000952
953class _TestArray(BaseTestCase):
954
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000955 ALLOWED_TYPES = ('processes',)
956
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000957 @classmethod
958 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000959 for i in range(1, len(seq)):
960 seq[i] += seq[i-1]
961
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000962 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000963 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000964 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
965 if raw:
966 arr = self.RawArray('i', seq)
967 else:
968 arr = self.Array('i', seq)
969
970 self.assertEqual(len(arr), len(seq))
971 self.assertEqual(arr[3], seq[3])
972 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
973
974 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
975
976 self.assertEqual(list(arr[:]), seq)
977
978 self.f(seq)
979
980 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200981 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000982 p.start()
983 p.join()
984
985 self.assertEqual(list(arr[:]), seq)
986
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000987 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000988 def test_array_from_size(self):
989 size = 10
990 # Test for zeroing (see issue #11675).
991 # The repetition below strengthens the test by increasing the chances
992 # of previously allocated non-zero memory being used for the new array
993 # on the 2nd and 3rd loops.
994 for _ in range(3):
995 arr = self.Array('i', size)
996 self.assertEqual(len(arr), size)
997 self.assertEqual(list(arr), [0] * size)
998 arr[:] = range(10)
999 self.assertEqual(list(arr), range(10))
1000 del arr
1001
1002 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001003 def test_rawarray(self):
1004 self.test_array(raw=True)
1005
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001006 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001007 def test_array_accepts_long(self):
1008 arr = self.Array('i', 10L)
1009 self.assertEqual(len(arr), 10)
1010 raw_arr = self.RawArray('i', 10L)
1011 self.assertEqual(len(raw_arr), 10)
1012
1013 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001014 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001015 arr1 = self.Array('i', range(10))
1016 lock1 = arr1.get_lock()
1017 obj1 = arr1.get_obj()
1018
1019 arr2 = self.Array('i', range(10), lock=None)
1020 lock2 = arr2.get_lock()
1021 obj2 = arr2.get_obj()
1022
1023 lock = self.Lock()
1024 arr3 = self.Array('i', range(10), lock=lock)
1025 lock3 = arr3.get_lock()
1026 obj3 = arr3.get_obj()
1027 self.assertEqual(lock, lock3)
1028
Jesse Noller6ab22152009-01-18 02:45:38 +00001029 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001030 self.assertFalse(hasattr(arr4, 'get_lock'))
1031 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001032 self.assertRaises(AttributeError,
1033 self.Array, 'i', range(10), lock='notalock')
1034
1035 arr5 = self.RawArray('i', range(10))
1036 self.assertFalse(hasattr(arr5, 'get_lock'))
1037 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001038
1039#
1040#
1041#
1042
1043class _TestContainers(BaseTestCase):
1044
1045 ALLOWED_TYPES = ('manager',)
1046
1047 def test_list(self):
1048 a = self.list(range(10))
1049 self.assertEqual(a[:], range(10))
1050
1051 b = self.list()
1052 self.assertEqual(b[:], [])
1053
1054 b.extend(range(5))
1055 self.assertEqual(b[:], range(5))
1056
1057 self.assertEqual(b[2], 2)
1058 self.assertEqual(b[2:10], [2,3,4])
1059
1060 b *= 2
1061 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1062
1063 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1064
1065 self.assertEqual(a[:], range(10))
1066
1067 d = [a, b]
1068 e = self.list(d)
1069 self.assertEqual(
1070 e[:],
1071 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1072 )
1073
1074 f = self.list([a])
1075 a.append('hello')
1076 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1077
1078 def test_dict(self):
1079 d = self.dict()
1080 indices = range(65, 70)
1081 for i in indices:
1082 d[i] = chr(i)
1083 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1084 self.assertEqual(sorted(d.keys()), indices)
1085 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1086 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1087
1088 def test_namespace(self):
1089 n = self.Namespace()
1090 n.name = 'Bob'
1091 n.job = 'Builder'
1092 n._hidden = 'hidden'
1093 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1094 del n.job
1095 self.assertEqual(str(n), "Namespace(name='Bob')")
1096 self.assertTrue(hasattr(n, 'name'))
1097 self.assertTrue(not hasattr(n, 'job'))
1098
1099#
1100#
1101#
1102
1103def sqr(x, wait=0.0):
1104 time.sleep(wait)
1105 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001106class _TestPool(BaseTestCase):
1107
1108 def test_apply(self):
1109 papply = self.pool.apply
1110 self.assertEqual(papply(sqr, (5,)), sqr(5))
1111 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1112
1113 def test_map(self):
1114 pmap = self.pool.map
1115 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1116 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1117 map(sqr, range(100)))
1118
Jesse Noller7530e472009-07-16 14:23:04 +00001119 def test_map_chunksize(self):
1120 try:
1121 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1122 except multiprocessing.TimeoutError:
1123 self.fail("pool.map_async with chunksize stalled on null list")
1124
Benjamin Petersondfd79492008-06-13 19:13:39 +00001125 def test_async(self):
1126 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1127 get = TimingWrapper(res.get)
1128 self.assertEqual(get(), 49)
1129 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1130
1131 def test_async_timeout(self):
1132 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1133 get = TimingWrapper(res.get)
1134 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1135 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1136
1137 def test_imap(self):
1138 it = self.pool.imap(sqr, range(10))
1139 self.assertEqual(list(it), map(sqr, range(10)))
1140
1141 it = self.pool.imap(sqr, range(10))
1142 for i in range(10):
1143 self.assertEqual(it.next(), i*i)
1144 self.assertRaises(StopIteration, it.next)
1145
1146 it = self.pool.imap(sqr, range(1000), chunksize=100)
1147 for i in range(1000):
1148 self.assertEqual(it.next(), i*i)
1149 self.assertRaises(StopIteration, it.next)
1150
1151 def test_imap_unordered(self):
1152 it = self.pool.imap_unordered(sqr, range(1000))
1153 self.assertEqual(sorted(it), map(sqr, range(1000)))
1154
1155 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1156 self.assertEqual(sorted(it), map(sqr, range(1000)))
1157
1158 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001159 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1160 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1161
Benjamin Petersondfd79492008-06-13 19:13:39 +00001162 p = multiprocessing.Pool(3)
1163 self.assertEqual(3, len(p._pool))
1164 p.close()
1165 p.join()
1166
1167 def test_terminate(self):
1168 if self.TYPE == 'manager':
1169 # On Unix a forked process increfs each shared object to
1170 # which its parent process held a reference. If the
1171 # forked process gets terminated then there is likely to
1172 # be a reference leak. So to prevent
1173 # _TestZZZNumberOfObjects from failing we skip this test
1174 # when using a manager.
1175 return
1176
1177 result = self.pool.map_async(
1178 time.sleep, [0.1 for i in range(10000)], chunksize=1
1179 )
1180 self.pool.terminate()
1181 join = TimingWrapper(self.pool.join)
1182 join()
1183 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001184
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001185 def test_empty_iterable(self):
1186 # See Issue 12157
1187 p = self.Pool(1)
1188
1189 self.assertEqual(p.map(sqr, []), [])
1190 self.assertEqual(list(p.imap(sqr, [])), [])
1191 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1192 self.assertEqual(p.map_async(sqr, []).get(), [])
1193
1194 p.close()
1195 p.join()
1196
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001197def unpickleable_result():
1198 return lambda: 42
1199
1200class _TestPoolWorkerErrors(BaseTestCase):
1201 ALLOWED_TYPES = ('processes', )
1202
1203 def test_unpickleable_result(self):
1204 from multiprocessing.pool import MaybeEncodingError
1205 p = multiprocessing.Pool(2)
1206
1207 # Make sure we don't lose pool processes because of encoding errors.
1208 for iteration in range(20):
1209 res = p.apply_async(unpickleable_result)
1210 self.assertRaises(MaybeEncodingError, res.get)
1211
1212 p.close()
1213 p.join()
1214
Jesse Noller654ade32010-01-27 03:05:57 +00001215class _TestPoolWorkerLifetime(BaseTestCase):
1216
1217 ALLOWED_TYPES = ('processes', )
1218 def test_pool_worker_lifetime(self):
1219 p = multiprocessing.Pool(3, maxtasksperchild=10)
1220 self.assertEqual(3, len(p._pool))
1221 origworkerpids = [w.pid for w in p._pool]
1222 # Run many tasks so each worker gets replaced (hopefully)
1223 results = []
1224 for i in range(100):
1225 results.append(p.apply_async(sqr, (i, )))
1226 # Fetch the results and verify we got the right answers,
1227 # also ensuring all the tasks have completed.
1228 for (j, res) in enumerate(results):
1229 self.assertEqual(res.get(), sqr(j))
1230 # Refill the pool
1231 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001232 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001233 # (countdown * DELTA = 5 seconds max startup process time)
1234 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001235 while countdown and not all(w.is_alive() for w in p._pool):
1236 countdown -= 1
1237 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001238 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001239 # All pids should be assigned. See issue #7805.
1240 self.assertNotIn(None, origworkerpids)
1241 self.assertNotIn(None, finalworkerpids)
1242 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001243 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1244 p.close()
1245 p.join()
1246
Charles-François Natali46f990e2011-10-24 18:43:51 +02001247 def test_pool_worker_lifetime_early_close(self):
1248 # Issue #10332: closing a pool whose workers have limited lifetimes
1249 # before all the tasks completed would make join() hang.
1250 p = multiprocessing.Pool(3, maxtasksperchild=1)
1251 results = []
1252 for i in range(6):
1253 results.append(p.apply_async(sqr, (i, 0.3)))
1254 p.close()
1255 p.join()
1256 # check the results
1257 for (j, res) in enumerate(results):
1258 self.assertEqual(res.get(), sqr(j))
1259
1260
Benjamin Petersondfd79492008-06-13 19:13:39 +00001261#
1262# Test that manager has expected number of shared objects left
1263#
1264
1265class _TestZZZNumberOfObjects(BaseTestCase):
1266 # Because test cases are sorted alphabetically, this one will get
1267 # run after all the other tests for the manager. It tests that
1268 # there have been no "reference leaks" for the manager's shared
1269 # objects. Note the comment in _TestPool.test_terminate().
1270 ALLOWED_TYPES = ('manager',)
1271
1272 def test_number_of_objects(self):
1273 EXPECTED_NUMBER = 1 # the pool object is still alive
1274 multiprocessing.active_children() # discard dead process objs
1275 gc.collect() # do garbage collection
1276 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001277 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001278 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001279 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001280 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001281
1282 self.assertEqual(refs, EXPECTED_NUMBER)
1283
1284#
1285# Test of creating a customized manager class
1286#
1287
1288from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1289
1290class FooBar(object):
1291 def f(self):
1292 return 'f()'
1293 def g(self):
1294 raise ValueError
1295 def _h(self):
1296 return '_h()'
1297
1298def baz():
1299 for i in xrange(10):
1300 yield i*i
1301
1302class IteratorProxy(BaseProxy):
1303 _exposed_ = ('next', '__next__')
1304 def __iter__(self):
1305 return self
1306 def next(self):
1307 return self._callmethod('next')
1308 def __next__(self):
1309 return self._callmethod('__next__')
1310
1311class MyManager(BaseManager):
1312 pass
1313
1314MyManager.register('Foo', callable=FooBar)
1315MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1316MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1317
1318
1319class _TestMyManager(BaseTestCase):
1320
1321 ALLOWED_TYPES = ('manager',)
1322
1323 def test_mymanager(self):
1324 manager = MyManager()
1325 manager.start()
1326
1327 foo = manager.Foo()
1328 bar = manager.Bar()
1329 baz = manager.baz()
1330
1331 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1332 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1333
1334 self.assertEqual(foo_methods, ['f', 'g'])
1335 self.assertEqual(bar_methods, ['f', '_h'])
1336
1337 self.assertEqual(foo.f(), 'f()')
1338 self.assertRaises(ValueError, foo.g)
1339 self.assertEqual(foo._callmethod('f'), 'f()')
1340 self.assertRaises(RemoteError, foo._callmethod, '_h')
1341
1342 self.assertEqual(bar.f(), 'f()')
1343 self.assertEqual(bar._h(), '_h()')
1344 self.assertEqual(bar._callmethod('f'), 'f()')
1345 self.assertEqual(bar._callmethod('_h'), '_h()')
1346
1347 self.assertEqual(list(baz), [i*i for i in range(10)])
1348
1349 manager.shutdown()
1350
1351#
1352# Test of connecting to a remote server and using xmlrpclib for serialization
1353#
1354
1355_queue = Queue.Queue()
1356def get_queue():
1357 return _queue
1358
1359class QueueManager(BaseManager):
1360 '''manager class used by server process'''
1361QueueManager.register('get_queue', callable=get_queue)
1362
1363class QueueManager2(BaseManager):
1364 '''manager class which specifies the same interface as QueueManager'''
1365QueueManager2.register('get_queue')
1366
1367
1368SERIALIZER = 'xmlrpclib'
1369
1370class _TestRemoteManager(BaseTestCase):
1371
1372 ALLOWED_TYPES = ('manager',)
1373
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001374 @classmethod
1375 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001376 manager = QueueManager2(
1377 address=address, authkey=authkey, serializer=SERIALIZER
1378 )
1379 manager.connect()
1380 queue = manager.get_queue()
1381 queue.put(('hello world', None, True, 2.25))
1382
1383 def test_remote(self):
1384 authkey = os.urandom(32)
1385
1386 manager = QueueManager(
1387 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1388 )
1389 manager.start()
1390
1391 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001392 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001393 p.start()
1394
1395 manager2 = QueueManager2(
1396 address=manager.address, authkey=authkey, serializer=SERIALIZER
1397 )
1398 manager2.connect()
1399 queue = manager2.get_queue()
1400
1401 # Note that xmlrpclib will deserialize object as a list not a tuple
1402 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1403
1404 # Because we are using xmlrpclib for serialization instead of
1405 # pickle this will cause a serialization error.
1406 self.assertRaises(Exception, queue.put, time.sleep)
1407
1408 # Make queue finalizer run before the server is stopped
1409 del queue
1410 manager.shutdown()
1411
Jesse Noller459a6482009-03-30 15:50:42 +00001412class _TestManagerRestart(BaseTestCase):
1413
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001414 @classmethod
1415 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001416 manager = QueueManager(
1417 address=address, authkey=authkey, serializer=SERIALIZER)
1418 manager.connect()
1419 queue = manager.get_queue()
1420 queue.put('hello world')
1421
1422 def test_rapid_restart(self):
1423 authkey = os.urandom(32)
1424 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001425 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001426 srvr = manager.get_server()
1427 addr = srvr.address
1428 # Close the connection.Listener socket which gets opened as a part
1429 # of manager.get_server(). It's not needed for the test.
1430 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001431 manager.start()
1432
1433 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001434 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001435 p.start()
1436 queue = manager.get_queue()
1437 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001438 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001439 manager.shutdown()
1440 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001441 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001442 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001443 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001444
Benjamin Petersondfd79492008-06-13 19:13:39 +00001445#
1446#
1447#
1448
1449SENTINEL = latin('')
1450
1451class _TestConnection(BaseTestCase):
1452
1453 ALLOWED_TYPES = ('processes', 'threads')
1454
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001455 @classmethod
1456 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001457 for msg in iter(conn.recv_bytes, SENTINEL):
1458 conn.send_bytes(msg)
1459 conn.close()
1460
1461 def test_connection(self):
1462 conn, child_conn = self.Pipe()
1463
1464 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001465 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001466 p.start()
1467
1468 seq = [1, 2.25, None]
1469 msg = latin('hello world')
1470 longmsg = msg * 10
1471 arr = array.array('i', range(4))
1472
1473 if self.TYPE == 'processes':
1474 self.assertEqual(type(conn.fileno()), int)
1475
1476 self.assertEqual(conn.send(seq), None)
1477 self.assertEqual(conn.recv(), seq)
1478
1479 self.assertEqual(conn.send_bytes(msg), None)
1480 self.assertEqual(conn.recv_bytes(), msg)
1481
1482 if self.TYPE == 'processes':
1483 buffer = array.array('i', [0]*10)
1484 expected = list(arr) + [0] * (10 - len(arr))
1485 self.assertEqual(conn.send_bytes(arr), None)
1486 self.assertEqual(conn.recv_bytes_into(buffer),
1487 len(arr) * buffer.itemsize)
1488 self.assertEqual(list(buffer), expected)
1489
1490 buffer = array.array('i', [0]*10)
1491 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1492 self.assertEqual(conn.send_bytes(arr), None)
1493 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1494 len(arr) * buffer.itemsize)
1495 self.assertEqual(list(buffer), expected)
1496
1497 buffer = bytearray(latin(' ' * 40))
1498 self.assertEqual(conn.send_bytes(longmsg), None)
1499 try:
1500 res = conn.recv_bytes_into(buffer)
1501 except multiprocessing.BufferTooShort, e:
1502 self.assertEqual(e.args, (longmsg,))
1503 else:
1504 self.fail('expected BufferTooShort, got %s' % res)
1505
1506 poll = TimingWrapper(conn.poll)
1507
1508 self.assertEqual(poll(), False)
1509 self.assertTimingAlmostEqual(poll.elapsed, 0)
1510
1511 self.assertEqual(poll(TIMEOUT1), False)
1512 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1513
1514 conn.send(None)
1515
1516 self.assertEqual(poll(TIMEOUT1), True)
1517 self.assertTimingAlmostEqual(poll.elapsed, 0)
1518
1519 self.assertEqual(conn.recv(), None)
1520
1521 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1522 conn.send_bytes(really_big_msg)
1523 self.assertEqual(conn.recv_bytes(), really_big_msg)
1524
1525 conn.send_bytes(SENTINEL) # tell child to quit
1526 child_conn.close()
1527
1528 if self.TYPE == 'processes':
1529 self.assertEqual(conn.readable, True)
1530 self.assertEqual(conn.writable, True)
1531 self.assertRaises(EOFError, conn.recv)
1532 self.assertRaises(EOFError, conn.recv_bytes)
1533
1534 p.join()
1535
1536 def test_duplex_false(self):
1537 reader, writer = self.Pipe(duplex=False)
1538 self.assertEqual(writer.send(1), None)
1539 self.assertEqual(reader.recv(), 1)
1540 if self.TYPE == 'processes':
1541 self.assertEqual(reader.readable, True)
1542 self.assertEqual(reader.writable, False)
1543 self.assertEqual(writer.readable, False)
1544 self.assertEqual(writer.writable, True)
1545 self.assertRaises(IOError, reader.send, 2)
1546 self.assertRaises(IOError, writer.recv)
1547 self.assertRaises(IOError, writer.poll)
1548
1549 def test_spawn_close(self):
1550 # We test that a pipe connection can be closed by parent
1551 # process immediately after child is spawned. On Windows this
1552 # would have sometimes failed on old versions because
1553 # child_conn would be closed before the child got a chance to
1554 # duplicate it.
1555 conn, child_conn = self.Pipe()
1556
1557 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001558 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001559 p.start()
1560 child_conn.close() # this might complete before child initializes
1561
1562 msg = latin('hello')
1563 conn.send_bytes(msg)
1564 self.assertEqual(conn.recv_bytes(), msg)
1565
1566 conn.send_bytes(SENTINEL)
1567 conn.close()
1568 p.join()
1569
1570 def test_sendbytes(self):
1571 if self.TYPE != 'processes':
1572 return
1573
1574 msg = latin('abcdefghijklmnopqrstuvwxyz')
1575 a, b = self.Pipe()
1576
1577 a.send_bytes(msg)
1578 self.assertEqual(b.recv_bytes(), msg)
1579
1580 a.send_bytes(msg, 5)
1581 self.assertEqual(b.recv_bytes(), msg[5:])
1582
1583 a.send_bytes(msg, 7, 8)
1584 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1585
1586 a.send_bytes(msg, 26)
1587 self.assertEqual(b.recv_bytes(), latin(''))
1588
1589 a.send_bytes(msg, 26, 0)
1590 self.assertEqual(b.recv_bytes(), latin(''))
1591
1592 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1593
1594 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1595
1596 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1597
1598 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1599
1600 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1601
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001602 @classmethod
1603 def _is_fd_assigned(cls, fd):
1604 try:
1605 os.fstat(fd)
1606 except OSError as e:
1607 if e.errno == errno.EBADF:
1608 return False
1609 raise
1610 else:
1611 return True
1612
1613 @classmethod
1614 def _writefd(cls, conn, data, create_dummy_fds=False):
1615 if create_dummy_fds:
1616 for i in range(0, 256):
1617 if not cls._is_fd_assigned(i):
1618 os.dup2(conn.fileno(), i)
1619 fd = reduction.recv_handle(conn)
1620 if msvcrt:
1621 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1622 os.write(fd, data)
1623 os.close(fd)
1624
Charles-François Natalif8413b22011-09-21 18:44:49 +02001625 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001626 def test_fd_transfer(self):
1627 if self.TYPE != 'processes':
1628 self.skipTest("only makes sense with processes")
1629 conn, child_conn = self.Pipe(duplex=True)
1630
1631 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001632 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001633 p.start()
1634 with open(test_support.TESTFN, "wb") as f:
1635 fd = f.fileno()
1636 if msvcrt:
1637 fd = msvcrt.get_osfhandle(fd)
1638 reduction.send_handle(conn, fd, p.pid)
1639 p.join()
1640 with open(test_support.TESTFN, "rb") as f:
1641 self.assertEqual(f.read(), b"foo")
1642
Charles-François Natalif8413b22011-09-21 18:44:49 +02001643 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001644 @unittest.skipIf(sys.platform == "win32",
1645 "test semantics don't make sense on Windows")
1646 @unittest.skipIf(MAXFD <= 256,
1647 "largest assignable fd number is too small")
1648 @unittest.skipUnless(hasattr(os, "dup2"),
1649 "test needs os.dup2()")
1650 def test_large_fd_transfer(self):
1651 # With fd > 256 (issue #11657)
1652 if self.TYPE != 'processes':
1653 self.skipTest("only makes sense with processes")
1654 conn, child_conn = self.Pipe(duplex=True)
1655
1656 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001657 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001658 p.start()
1659 with open(test_support.TESTFN, "wb") as f:
1660 fd = f.fileno()
1661 for newfd in range(256, MAXFD):
1662 if not self._is_fd_assigned(newfd):
1663 break
1664 else:
1665 self.fail("could not find an unassigned large file descriptor")
1666 os.dup2(fd, newfd)
1667 try:
1668 reduction.send_handle(conn, newfd, p.pid)
1669 finally:
1670 os.close(newfd)
1671 p.join()
1672 with open(test_support.TESTFN, "rb") as f:
1673 self.assertEqual(f.read(), b"bar")
1674
Jesus Ceac23484b2011-09-21 03:47:39 +02001675 @classmethod
1676 def _send_data_without_fd(self, conn):
1677 os.write(conn.fileno(), b"\0")
1678
Charles-François Natalif8413b22011-09-21 18:44:49 +02001679 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001680 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1681 def test_missing_fd_transfer(self):
1682 # Check that exception is raised when received data is not
1683 # accompanied by a file descriptor in ancillary data.
1684 if self.TYPE != 'processes':
1685 self.skipTest("only makes sense with processes")
1686 conn, child_conn = self.Pipe(duplex=True)
1687
1688 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1689 p.daemon = True
1690 p.start()
1691 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1692 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001693
Benjamin Petersondfd79492008-06-13 19:13:39 +00001694class _TestListenerClient(BaseTestCase):
1695
1696 ALLOWED_TYPES = ('processes', 'threads')
1697
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001698 @classmethod
1699 def _test(cls, address):
1700 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001701 conn.send('hello')
1702 conn.close()
1703
1704 def test_listener_client(self):
1705 for family in self.connection.families:
1706 l = self.connection.Listener(family=family)
1707 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001708 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001709 p.start()
1710 conn = l.accept()
1711 self.assertEqual(conn.recv(), 'hello')
1712 p.join()
1713 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001714
1715 def test_issue14725(self):
1716 l = self.connection.Listener()
1717 p = self.Process(target=self._test, args=(l.address,))
1718 p.daemon = True
1719 p.start()
1720 time.sleep(1)
1721 # On Windows the client process should by now have connected,
1722 # written data and closed the pipe handle by now. This causes
1723 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1724 # 14725.
1725 conn = l.accept()
1726 self.assertEqual(conn.recv(), 'hello')
1727 conn.close()
1728 p.join()
1729 l.close()
1730
Benjamin Petersondfd79492008-06-13 19:13:39 +00001731#
1732# Test of sending connection and socket objects between processes
1733#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001734"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001735class _TestPicklingConnections(BaseTestCase):
1736
1737 ALLOWED_TYPES = ('processes',)
1738
1739 def _listener(self, conn, families):
1740 for fam in families:
1741 l = self.connection.Listener(family=fam)
1742 conn.send(l.address)
1743 new_conn = l.accept()
1744 conn.send(new_conn)
1745
1746 if self.TYPE == 'processes':
1747 l = socket.socket()
1748 l.bind(('localhost', 0))
1749 conn.send(l.getsockname())
1750 l.listen(1)
1751 new_conn, addr = l.accept()
1752 conn.send(new_conn)
1753
1754 conn.recv()
1755
1756 def _remote(self, conn):
1757 for (address, msg) in iter(conn.recv, None):
1758 client = self.connection.Client(address)
1759 client.send(msg.upper())
1760 client.close()
1761
1762 if self.TYPE == 'processes':
1763 address, msg = conn.recv()
1764 client = socket.socket()
1765 client.connect(address)
1766 client.sendall(msg.upper())
1767 client.close()
1768
1769 conn.close()
1770
1771 def test_pickling(self):
1772 try:
1773 multiprocessing.allow_connection_pickling()
1774 except ImportError:
1775 return
1776
1777 families = self.connection.families
1778
1779 lconn, lconn0 = self.Pipe()
1780 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001781 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001782 lp.start()
1783 lconn0.close()
1784
1785 rconn, rconn0 = self.Pipe()
1786 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001787 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001788 rp.start()
1789 rconn0.close()
1790
1791 for fam in families:
1792 msg = ('This connection uses family %s' % fam).encode('ascii')
1793 address = lconn.recv()
1794 rconn.send((address, msg))
1795 new_conn = lconn.recv()
1796 self.assertEqual(new_conn.recv(), msg.upper())
1797
1798 rconn.send(None)
1799
1800 if self.TYPE == 'processes':
1801 msg = latin('This connection uses a normal socket')
1802 address = lconn.recv()
1803 rconn.send((address, msg))
1804 if hasattr(socket, 'fromfd'):
1805 new_conn = lconn.recv()
1806 self.assertEqual(new_conn.recv(100), msg.upper())
1807 else:
1808 # XXX On Windows with Py2.6 need to backport fromfd()
1809 discard = lconn.recv_bytes()
1810
1811 lconn.send(None)
1812
1813 rconn.close()
1814 lconn.close()
1815
1816 lp.join()
1817 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001818"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001819#
1820#
1821#
1822
1823class _TestHeap(BaseTestCase):
1824
1825 ALLOWED_TYPES = ('processes',)
1826
1827 def test_heap(self):
1828 iterations = 5000
1829 maxblocks = 50
1830 blocks = []
1831
1832 # create and destroy lots of blocks of different sizes
1833 for i in xrange(iterations):
1834 size = int(random.lognormvariate(0, 1) * 1000)
1835 b = multiprocessing.heap.BufferWrapper(size)
1836 blocks.append(b)
1837 if len(blocks) > maxblocks:
1838 i = random.randrange(maxblocks)
1839 del blocks[i]
1840
1841 # get the heap object
1842 heap = multiprocessing.heap.BufferWrapper._heap
1843
1844 # verify the state of the heap
1845 all = []
1846 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001847 heap._lock.acquire()
1848 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001849 for L in heap._len_to_seq.values():
1850 for arena, start, stop in L:
1851 all.append((heap._arenas.index(arena), start, stop,
1852 stop-start, 'free'))
1853 for arena, start, stop in heap._allocated_blocks:
1854 all.append((heap._arenas.index(arena), start, stop,
1855 stop-start, 'occupied'))
1856 occupied += (stop-start)
1857
1858 all.sort()
1859
1860 for i in range(len(all)-1):
1861 (arena, start, stop) = all[i][:3]
1862 (narena, nstart, nstop) = all[i+1][:3]
1863 self.assertTrue((arena != narena and nstart == 0) or
1864 (stop == nstart))
1865
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001866 def test_free_from_gc(self):
1867 # Check that freeing of blocks by the garbage collector doesn't deadlock
1868 # (issue #12352).
1869 # Make sure the GC is enabled, and set lower collection thresholds to
1870 # make collections more frequent (and increase the probability of
1871 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001872 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001873 gc.enable()
1874 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001875 thresholds = gc.get_threshold()
1876 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001877 gc.set_threshold(10)
1878
1879 # perform numerous block allocations, with cyclic references to make
1880 # sure objects are collected asynchronously by the gc
1881 for i in range(5000):
1882 a = multiprocessing.heap.BufferWrapper(1)
1883 b = multiprocessing.heap.BufferWrapper(1)
1884 # circular references
1885 a.buddy = b
1886 b.buddy = a
1887
Benjamin Petersondfd79492008-06-13 19:13:39 +00001888#
1889#
1890#
1891
Benjamin Petersondfd79492008-06-13 19:13:39 +00001892class _Foo(Structure):
1893 _fields_ = [
1894 ('x', c_int),
1895 ('y', c_double)
1896 ]
1897
1898class _TestSharedCTypes(BaseTestCase):
1899
1900 ALLOWED_TYPES = ('processes',)
1901
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001902 def setUp(self):
1903 if not HAS_SHAREDCTYPES:
1904 self.skipTest("requires multiprocessing.sharedctypes")
1905
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001906 @classmethod
1907 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001908 x.value *= 2
1909 y.value *= 2
1910 foo.x *= 2
1911 foo.y *= 2
1912 string.value *= 2
1913 for i in range(len(arr)):
1914 arr[i] *= 2
1915
1916 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001917 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001918 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001919 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001920 arr = self.Array('d', range(10), lock=lock)
1921 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001922 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001923
1924 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001925 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001926 p.start()
1927 p.join()
1928
1929 self.assertEqual(x.value, 14)
1930 self.assertAlmostEqual(y.value, 2.0/3.0)
1931 self.assertEqual(foo.x, 6)
1932 self.assertAlmostEqual(foo.y, 4.0)
1933 for i in range(10):
1934 self.assertAlmostEqual(arr[i], i*2)
1935 self.assertEqual(string.value, latin('hellohello'))
1936
1937 def test_synchronize(self):
1938 self.test_sharedctypes(lock=True)
1939
1940 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001941 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001942 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001943 foo.x = 0
1944 foo.y = 0
1945 self.assertEqual(bar.x, 2)
1946 self.assertAlmostEqual(bar.y, 5.0)
1947
1948#
1949#
1950#
1951
1952class _TestFinalize(BaseTestCase):
1953
1954 ALLOWED_TYPES = ('processes',)
1955
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001956 @classmethod
1957 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001958 class Foo(object):
1959 pass
1960
1961 a = Foo()
1962 util.Finalize(a, conn.send, args=('a',))
1963 del a # triggers callback for a
1964
1965 b = Foo()
1966 close_b = util.Finalize(b, conn.send, args=('b',))
1967 close_b() # triggers callback for b
1968 close_b() # does nothing because callback has already been called
1969 del b # does nothing because callback has already been called
1970
1971 c = Foo()
1972 util.Finalize(c, conn.send, args=('c',))
1973
1974 d10 = Foo()
1975 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1976
1977 d01 = Foo()
1978 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1979 d02 = Foo()
1980 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1981 d03 = Foo()
1982 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1983
1984 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1985
1986 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1987
Ezio Melottic2077b02011-03-16 12:34:31 +02001988 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001989 # garbage collecting locals
1990 util._exit_function()
1991 conn.close()
1992 os._exit(0)
1993
1994 def test_finalize(self):
1995 conn, child_conn = self.Pipe()
1996
1997 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001998 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001999 p.start()
2000 p.join()
2001
2002 result = [obj for obj in iter(conn.recv, 'STOP')]
2003 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2004
2005#
2006# Test that from ... import * works for each module
2007#
2008
2009class _TestImportStar(BaseTestCase):
2010
2011 ALLOWED_TYPES = ('processes',)
2012
2013 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002014 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002015 'multiprocessing', 'multiprocessing.connection',
2016 'multiprocessing.heap', 'multiprocessing.managers',
2017 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002018 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002019 ]
2020
Charles-François Natalif8413b22011-09-21 18:44:49 +02002021 if HAS_REDUCTION:
2022 modules.append('multiprocessing.reduction')
2023
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002024 if c_int is not None:
2025 # This module requires _ctypes
2026 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002027
2028 for name in modules:
2029 __import__(name)
2030 mod = sys.modules[name]
2031
2032 for attr in getattr(mod, '__all__', ()):
2033 self.assertTrue(
2034 hasattr(mod, attr),
2035 '%r does not have attribute %r' % (mod, attr)
2036 )
2037
2038#
2039# Quick test that logging works -- does not test logging output
2040#
2041
2042class _TestLogging(BaseTestCase):
2043
2044 ALLOWED_TYPES = ('processes',)
2045
2046 def test_enable_logging(self):
2047 logger = multiprocessing.get_logger()
2048 logger.setLevel(util.SUBWARNING)
2049 self.assertTrue(logger is not None)
2050 logger.debug('this will not be printed')
2051 logger.info('nor will this')
2052 logger.setLevel(LOG_LEVEL)
2053
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002054 @classmethod
2055 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002056 logger = multiprocessing.get_logger()
2057 conn.send(logger.getEffectiveLevel())
2058
2059 def test_level(self):
2060 LEVEL1 = 32
2061 LEVEL2 = 37
2062
2063 logger = multiprocessing.get_logger()
2064 root_logger = logging.getLogger()
2065 root_level = root_logger.level
2066
2067 reader, writer = multiprocessing.Pipe(duplex=False)
2068
2069 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002070 p = self.Process(target=self._test_level, args=(writer,))
2071 p.daemon = True
2072 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002073 self.assertEqual(LEVEL1, reader.recv())
2074
2075 logger.setLevel(logging.NOTSET)
2076 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002077 p = self.Process(target=self._test_level, args=(writer,))
2078 p.daemon = True
2079 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002080 self.assertEqual(LEVEL2, reader.recv())
2081
2082 root_logger.setLevel(root_level)
2083 logger.setLevel(level=LOG_LEVEL)
2084
Jesse Noller814d02d2009-11-21 14:38:23 +00002085
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002086# class _TestLoggingProcessName(BaseTestCase):
2087#
2088# def handle(self, record):
2089# assert record.processName == multiprocessing.current_process().name
2090# self.__handled = True
2091#
2092# def test_logging(self):
2093# handler = logging.Handler()
2094# handler.handle = self.handle
2095# self.__handled = False
2096# # Bypass getLogger() and side-effects
2097# logger = logging.getLoggerClass()(
2098# 'multiprocessing.test.TestLoggingProcessName')
2099# logger.addHandler(handler)
2100# logger.propagate = False
2101#
2102# logger.warn('foo')
2103# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002104
Benjamin Petersondfd79492008-06-13 19:13:39 +00002105#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002106# Test to verify handle verification, see issue 3321
2107#
2108
2109class TestInvalidHandle(unittest.TestCase):
2110
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002111 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002112 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002113 conn = _multiprocessing.Connection(44977608)
2114 self.assertRaises(IOError, conn.poll)
2115 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002116
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002117#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002118# Functions used to create test cases from the base ones in this module
2119#
2120
2121def get_attributes(Source, names):
2122 d = {}
2123 for name in names:
2124 obj = getattr(Source, name)
2125 if type(obj) == type(get_attributes):
2126 obj = staticmethod(obj)
2127 d[name] = obj
2128 return d
2129
2130def create_test_cases(Mixin, type):
2131 result = {}
2132 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002133 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002134
2135 for name in glob.keys():
2136 if name.startswith('_Test'):
2137 base = glob[name]
2138 if type in base.ALLOWED_TYPES:
2139 newname = 'With' + Type + name[1:]
2140 class Temp(base, unittest.TestCase, Mixin):
2141 pass
2142 result[newname] = Temp
2143 Temp.__name__ = newname
2144 Temp.__module__ = Mixin.__module__
2145 return result
2146
2147#
2148# Create test cases
2149#
2150
2151class ProcessesMixin(object):
2152 TYPE = 'processes'
2153 Process = multiprocessing.Process
2154 locals().update(get_attributes(multiprocessing, (
2155 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2156 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2157 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002158 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002159 )))
2160
2161testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2162globals().update(testcases_processes)
2163
2164
2165class ManagerMixin(object):
2166 TYPE = 'manager'
2167 Process = multiprocessing.Process
2168 manager = object.__new__(multiprocessing.managers.SyncManager)
2169 locals().update(get_attributes(manager, (
2170 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2171 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002172 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002173 )))
2174
2175testcases_manager = create_test_cases(ManagerMixin, type='manager')
2176globals().update(testcases_manager)
2177
2178
2179class ThreadsMixin(object):
2180 TYPE = 'threads'
2181 Process = multiprocessing.dummy.Process
2182 locals().update(get_attributes(multiprocessing.dummy, (
2183 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2184 'Condition', 'Event', 'Value', 'Array', 'current_process',
2185 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002186 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002187 )))
2188
2189testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2190globals().update(testcases_threads)
2191
Neal Norwitz0c519b32008-08-25 01:50:24 +00002192class OtherTest(unittest.TestCase):
2193 # TODO: add more tests for deliver/answer challenge.
2194 def test_deliver_challenge_auth_failure(self):
2195 class _FakeConnection(object):
2196 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002197 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002198 def send_bytes(self, data):
2199 pass
2200 self.assertRaises(multiprocessing.AuthenticationError,
2201 multiprocessing.connection.deliver_challenge,
2202 _FakeConnection(), b'abc')
2203
2204 def test_answer_challenge_auth_failure(self):
2205 class _FakeConnection(object):
2206 def __init__(self):
2207 self.count = 0
2208 def recv_bytes(self, size):
2209 self.count += 1
2210 if self.count == 1:
2211 return multiprocessing.connection.CHALLENGE
2212 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002213 return b'something bogus'
2214 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002215 def send_bytes(self, data):
2216 pass
2217 self.assertRaises(multiprocessing.AuthenticationError,
2218 multiprocessing.connection.answer_challenge,
2219 _FakeConnection(), b'abc')
2220
Jesse Noller7152f6d2009-04-02 05:17:26 +00002221#
2222# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2223#
2224
2225def initializer(ns):
2226 ns.test += 1
2227
2228class TestInitializers(unittest.TestCase):
2229 def setUp(self):
2230 self.mgr = multiprocessing.Manager()
2231 self.ns = self.mgr.Namespace()
2232 self.ns.test = 0
2233
2234 def tearDown(self):
2235 self.mgr.shutdown()
2236
2237 def test_manager_initializer(self):
2238 m = multiprocessing.managers.SyncManager()
2239 self.assertRaises(TypeError, m.start, 1)
2240 m.start(initializer, (self.ns,))
2241 self.assertEqual(self.ns.test, 1)
2242 m.shutdown()
2243
2244 def test_pool_initializer(self):
2245 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2246 p = multiprocessing.Pool(1, initializer, (self.ns,))
2247 p.close()
2248 p.join()
2249 self.assertEqual(self.ns.test, 1)
2250
Jesse Noller1b90efb2009-06-30 17:11:52 +00002251#
2252# Issue 5155, 5313, 5331: Test process in processes
2253# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2254#
2255
2256def _ThisSubProcess(q):
2257 try:
2258 item = q.get(block=False)
2259 except Queue.Empty:
2260 pass
2261
2262def _TestProcess(q):
2263 queue = multiprocessing.Queue()
2264 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002265 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002266 subProc.start()
2267 subProc.join()
2268
2269def _afunc(x):
2270 return x*x
2271
2272def pool_in_process():
2273 pool = multiprocessing.Pool(processes=4)
2274 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2275
2276class _file_like(object):
2277 def __init__(self, delegate):
2278 self._delegate = delegate
2279 self._pid = None
2280
2281 @property
2282 def cache(self):
2283 pid = os.getpid()
2284 # There are no race conditions since fork keeps only the running thread
2285 if pid != self._pid:
2286 self._pid = pid
2287 self._cache = []
2288 return self._cache
2289
2290 def write(self, data):
2291 self.cache.append(data)
2292
2293 def flush(self):
2294 self._delegate.write(''.join(self.cache))
2295 self._cache = []
2296
2297class TestStdinBadfiledescriptor(unittest.TestCase):
2298
2299 def test_queue_in_process(self):
2300 queue = multiprocessing.Queue()
2301 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2302 proc.start()
2303 proc.join()
2304
2305 def test_pool_in_process(self):
2306 p = multiprocessing.Process(target=pool_in_process)
2307 p.start()
2308 p.join()
2309
2310 def test_flushing(self):
2311 sio = StringIO()
2312 flike = _file_like(sio)
2313 flike.write('foo')
2314 proc = multiprocessing.Process(target=lambda: flike.flush())
2315 flike.flush()
2316 assert sio.getvalue() == 'foo'
2317
Richard Oudkerke4b99382012-07-27 14:05:46 +01002318#
2319# Test interaction with socket timeouts - see Issue #6056
2320#
2321
2322class TestTimeouts(unittest.TestCase):
2323 @classmethod
2324 def _test_timeout(cls, child, address):
2325 time.sleep(1)
2326 child.send(123)
2327 child.close()
2328 conn = multiprocessing.connection.Client(address)
2329 conn.send(456)
2330 conn.close()
2331
2332 def test_timeout(self):
2333 old_timeout = socket.getdefaulttimeout()
2334 try:
2335 socket.setdefaulttimeout(0.1)
2336 parent, child = multiprocessing.Pipe(duplex=True)
2337 l = multiprocessing.connection.Listener(family='AF_INET')
2338 p = multiprocessing.Process(target=self._test_timeout,
2339 args=(child, l.address))
2340 p.start()
2341 child.close()
2342 self.assertEqual(parent.recv(), 123)
2343 parent.close()
2344 conn = l.accept()
2345 self.assertEqual(conn.recv(), 456)
2346 conn.close()
2347 l.close()
2348 p.join(10)
2349 finally:
2350 socket.setdefaulttimeout(old_timeout)
2351
Jesse Noller1b90efb2009-06-30 17:11:52 +00002352testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerke4b99382012-07-27 14:05:46 +01002353 TestStdinBadfiledescriptor, TestTimeouts]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002354
Benjamin Petersondfd79492008-06-13 19:13:39 +00002355#
2356#
2357#
2358
2359def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002360 if sys.platform.startswith("linux"):
2361 try:
2362 lock = multiprocessing.RLock()
2363 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002364 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002365
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002366 check_enough_semaphores()
2367
Benjamin Petersondfd79492008-06-13 19:13:39 +00002368 if run is None:
2369 from test.test_support import run_unittest as run
2370
2371 util.get_temp_dir() # creates temp directory for use by all processes
2372
2373 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2374
Jesse Noller146b7ab2008-07-02 16:44:09 +00002375 ProcessesMixin.pool = multiprocessing.Pool(4)
2376 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2377 ManagerMixin.manager.__init__()
2378 ManagerMixin.manager.start()
2379 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002380
2381 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002382 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2383 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002384 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2385 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002386 )
2387
2388 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2389 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002390 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2391 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002392 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002393 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002394 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002395 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2396 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2397 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002398 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002399
Jesse Noller146b7ab2008-07-02 16:44:09 +00002400 ThreadsMixin.pool.terminate()
2401 ProcessesMixin.pool.terminate()
2402 ManagerMixin.pool.terminate()
2403 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002404
Jesse Noller146b7ab2008-07-02 16:44:09 +00002405 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002406
2407def main():
2408 test_main(unittest.TextTestRunner(verbosity=2).run)
2409
2410if __name__ == '__main__':
2411 main()