blob: 956c0367c2e64bc11136ad51bc920e0fe2aae517 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010019import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000020from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000021from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000022_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020023# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000024# message: "No module named _multiprocessing". _multiprocessing is not compiled
25# without thread support.
26import threading
R. David Murray3db8a342009-03-30 23:05:48 +000027
Jesse Noller37040cd2008-09-30 00:15:45 +000028# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000029test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000030
Benjamin Petersondfd79492008-06-13 19:13:39 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000035import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000036
Charles-François Natalif8413b22011-09-21 18:44:49 +020037from multiprocessing import util
38
39try:
40 from multiprocessing import reduction
41 HAS_REDUCTION = True
42except ImportError:
43 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000044
Brian Curtina06e9b82010-10-07 02:27:41 +000045try:
46 from multiprocessing.sharedctypes import Value, copy
47 HAS_SHAREDCTYPES = True
48except ImportError:
49 HAS_SHAREDCTYPES = False
50
Antoine Pitroua1a8da82011-08-23 19:54:20 +020051try:
52 import msvcrt
53except ImportError:
54 msvcrt = None
55
Benjamin Petersondfd79492008-06-13 19:13:39 +000056#
57#
58#
59
Benjamin Petersone79edf52008-07-13 18:34:58 +000060latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000061
Benjamin Petersondfd79492008-06-13 19:13:39 +000062#
63# Constants
64#
65
66LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000067#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000068
69DELTA = 0.1
70CHECK_TIMINGS = False # making true makes tests take a lot longer
71 # and can sometimes cause some non-serious
72 # failures because some calls block a bit
73 # longer than expected
74if CHECK_TIMINGS:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
76else:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
78
79HAVE_GETVALUE = not getattr(_multiprocessing,
80 'HAVE_BROKEN_SEM_GETVALUE', False)
81
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000082WIN32 = (sys.platform == "win32")
83
Antoine Pitroua1a8da82011-08-23 19:54:20 +020084try:
85 MAXFD = os.sysconf("SC_OPEN_MAX")
86except:
87 MAXFD = 256
88
Benjamin Petersondfd79492008-06-13 19:13:39 +000089#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000090# Some tests require ctypes
91#
92
93try:
Nick Coghlan13623662010-04-10 14:24:36 +000094 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000095except ImportError:
96 Structure = object
97 c_int = c_double = None
98
Charles-François Natali6392d7f2011-11-22 18:35:18 +010099
100def check_enough_semaphores():
101 """Check that the system supports enough semaphores to run the test."""
102 # minimum number of semaphores available according to POSIX
103 nsems_min = 256
104 try:
105 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
106 except (AttributeError, ValueError):
107 # sysconf not available or setting not available
108 return
109 if nsems == -1 or nsems >= nsems_min:
110 return
111 raise unittest.SkipTest("The OS doesn't support enough semaphores "
112 "to run the test (required: %d)." % nsems_min)
113
114
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000115#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000116# Creates a wrapper for a function which records the time it takes to finish
117#
118
119class TimingWrapper(object):
120
121 def __init__(self, func):
122 self.func = func
123 self.elapsed = None
124
125 def __call__(self, *args, **kwds):
126 t = time.time()
127 try:
128 return self.func(*args, **kwds)
129 finally:
130 self.elapsed = time.time() - t
131
132#
133# Base class for test cases
134#
135
136class BaseTestCase(object):
137
138 ALLOWED_TYPES = ('processes', 'manager', 'threads')
139
140 def assertTimingAlmostEqual(self, a, b):
141 if CHECK_TIMINGS:
142 self.assertAlmostEqual(a, b, 1)
143
144 def assertReturnsIfImplemented(self, value, func, *args):
145 try:
146 res = func(*args)
147 except NotImplementedError:
148 pass
149 else:
150 return self.assertEqual(value, res)
151
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000152 # For the sanity of Windows users, rather than crashing or freezing in
153 # multiple ways.
154 def __reduce__(self, *args):
155 raise NotImplementedError("shouldn't try to pickle a test case")
156
157 __reduce_ex__ = __reduce__
158
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159#
160# Return the value of a semaphore
161#
162
163def get_value(self):
164 try:
165 return self.get_value()
166 except AttributeError:
167 try:
168 return self._Semaphore__value
169 except AttributeError:
170 try:
171 return self._value
172 except AttributeError:
173 raise NotImplementedError
174
175#
176# Testcases
177#
178
179class _TestProcess(BaseTestCase):
180
181 ALLOWED_TYPES = ('processes', 'threads')
182
183 def test_current(self):
184 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600185 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000186
187 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000188 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000189
190 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000192 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000193 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000194 self.assertEqual(current.ident, os.getpid())
195 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000197 @classmethod
198 def _test(cls, q, *args, **kwds):
199 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000200 q.put(args)
201 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000203 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000204 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205 q.put(current.pid)
206
207 def test_process(self):
208 q = self.Queue(1)
209 e = self.Event()
210 args = (q, 1, 2)
211 kwargs = {'hello':23, 'bye':2.54}
212 name = 'SomeProcess'
213 p = self.Process(
214 target=self._test, args=args, kwargs=kwargs, name=name
215 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000216 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000217 current = self.current_process()
218
219 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000220 self.assertEqual(p.authkey, current.authkey)
221 self.assertEqual(p.is_alive(), False)
222 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000223 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000225 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000226
227 p.start()
228
Ezio Melotti2623a372010-11-21 13:34:58 +0000229 self.assertEqual(p.exitcode, None)
230 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000231 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000232
Ezio Melotti2623a372010-11-21 13:34:58 +0000233 self.assertEqual(q.get(), args[1:])
234 self.assertEqual(q.get(), kwargs)
235 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000237 self.assertEqual(q.get(), current.authkey)
238 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000239
240 p.join()
241
Ezio Melotti2623a372010-11-21 13:34:58 +0000242 self.assertEqual(p.exitcode, 0)
243 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000244 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000245
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000246 @classmethod
247 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000248 time.sleep(1000)
249
250 def test_terminate(self):
251 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600252 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000253
254 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000255 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000256 p.start()
257
258 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000259 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000260 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
262 p.terminate()
263
264 join = TimingWrapper(p.join)
265 self.assertEqual(join(), None)
266 self.assertTimingAlmostEqual(join.elapsed, 0.0)
267
268 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270
271 p.join()
272
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000273 # XXX sometimes get p.exitcode == 0 on Windows ...
274 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000275
276 def test_cpu_count(self):
277 try:
278 cpus = multiprocessing.cpu_count()
279 except NotImplementedError:
280 cpus = 1
281 self.assertTrue(type(cpus) is int)
282 self.assertTrue(cpus >= 1)
283
284 def test_active_children(self):
285 self.assertEqual(type(self.active_children()), list)
286
287 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000288 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289
Jesus Cea6f6016b2011-09-09 20:26:57 +0200290 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000292 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000293
294 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000295 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000296
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000297 @classmethod
298 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000299 from multiprocessing import forking
300 wconn.send(id)
301 if len(id) < 2:
302 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000303 p = cls.Process(
304 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000305 )
306 p.start()
307 p.join()
308
309 def test_recursion(self):
310 rconn, wconn = self.Pipe(duplex=False)
311 self._test_recursion(wconn, [])
312
313 time.sleep(DELTA)
314 result = []
315 while rconn.poll():
316 result.append(rconn.recv())
317
318 expected = [
319 [],
320 [0],
321 [0, 0],
322 [0, 1],
323 [1],
324 [1, 0],
325 [1, 1]
326 ]
327 self.assertEqual(result, expected)
328
Richard Oudkerk2182e052012-06-06 19:01:14 +0100329 @classmethod
330 def _test_sys_exit(cls, reason, testfn):
331 sys.stderr = open(testfn, 'w')
332 sys.exit(reason)
333
334 def test_sys_exit(self):
335 # See Issue 13854
336 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600337 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100338
339 testfn = test_support.TESTFN
340 self.addCleanup(test_support.unlink, testfn)
341
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000342 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100343 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
344 p.daemon = True
345 p.start()
346 p.join(5)
347 self.assertEqual(p.exitcode, code)
348
349 with open(testfn, 'r') as f:
350 self.assertEqual(f.read().rstrip(), str(reason))
351
352 for reason in (True, False, 8):
353 p = self.Process(target=sys.exit, args=(reason,))
354 p.daemon = True
355 p.start()
356 p.join(5)
357 self.assertEqual(p.exitcode, reason)
358
Benjamin Petersondfd79492008-06-13 19:13:39 +0000359#
360#
361#
362
363class _UpperCaser(multiprocessing.Process):
364
365 def __init__(self):
366 multiprocessing.Process.__init__(self)
367 self.child_conn, self.parent_conn = multiprocessing.Pipe()
368
369 def run(self):
370 self.parent_conn.close()
371 for s in iter(self.child_conn.recv, None):
372 self.child_conn.send(s.upper())
373 self.child_conn.close()
374
375 def submit(self, s):
376 assert type(s) is str
377 self.parent_conn.send(s)
378 return self.parent_conn.recv()
379
380 def stop(self):
381 self.parent_conn.send(None)
382 self.parent_conn.close()
383 self.child_conn.close()
384
385class _TestSubclassingProcess(BaseTestCase):
386
387 ALLOWED_TYPES = ('processes',)
388
389 def test_subclassing(self):
390 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200391 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 uppercaser.start()
393 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
394 self.assertEqual(uppercaser.submit('world'), 'WORLD')
395 uppercaser.stop()
396 uppercaser.join()
397
398#
399#
400#
401
402def queue_empty(q):
403 if hasattr(q, 'empty'):
404 return q.empty()
405 else:
406 return q.qsize() == 0
407
408def queue_full(q, maxsize):
409 if hasattr(q, 'full'):
410 return q.full()
411 else:
412 return q.qsize() == maxsize
413
414
415class _TestQueue(BaseTestCase):
416
417
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000418 @classmethod
419 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 child_can_start.wait()
421 for i in range(6):
422 queue.get()
423 parent_can_continue.set()
424
425 def test_put(self):
426 MAXSIZE = 6
427 queue = self.Queue(maxsize=MAXSIZE)
428 child_can_start = self.Event()
429 parent_can_continue = self.Event()
430
431 proc = self.Process(
432 target=self._test_put,
433 args=(queue, child_can_start, parent_can_continue)
434 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000435 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000436 proc.start()
437
438 self.assertEqual(queue_empty(queue), True)
439 self.assertEqual(queue_full(queue, MAXSIZE), False)
440
441 queue.put(1)
442 queue.put(2, True)
443 queue.put(3, True, None)
444 queue.put(4, False)
445 queue.put(5, False, None)
446 queue.put_nowait(6)
447
448 # the values may be in buffer but not yet in pipe so sleep a bit
449 time.sleep(DELTA)
450
451 self.assertEqual(queue_empty(queue), False)
452 self.assertEqual(queue_full(queue, MAXSIZE), True)
453
454 put = TimingWrapper(queue.put)
455 put_nowait = TimingWrapper(queue.put_nowait)
456
457 self.assertRaises(Queue.Full, put, 7, False)
458 self.assertTimingAlmostEqual(put.elapsed, 0)
459
460 self.assertRaises(Queue.Full, put, 7, False, None)
461 self.assertTimingAlmostEqual(put.elapsed, 0)
462
463 self.assertRaises(Queue.Full, put_nowait, 7)
464 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
465
466 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
467 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
468
469 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
470 self.assertTimingAlmostEqual(put.elapsed, 0)
471
472 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
473 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
474
475 child_can_start.set()
476 parent_can_continue.wait()
477
478 self.assertEqual(queue_empty(queue), True)
479 self.assertEqual(queue_full(queue, MAXSIZE), False)
480
481 proc.join()
482
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000483 @classmethod
484 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000486 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000487 queue.put(2)
488 queue.put(3)
489 queue.put(4)
490 queue.put(5)
491 parent_can_continue.set()
492
493 def test_get(self):
494 queue = self.Queue()
495 child_can_start = self.Event()
496 parent_can_continue = self.Event()
497
498 proc = self.Process(
499 target=self._test_get,
500 args=(queue, child_can_start, parent_can_continue)
501 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000502 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000503 proc.start()
504
505 self.assertEqual(queue_empty(queue), True)
506
507 child_can_start.set()
508 parent_can_continue.wait()
509
510 time.sleep(DELTA)
511 self.assertEqual(queue_empty(queue), False)
512
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000513 # Hangs unexpectedly, remove for now
514 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000515 self.assertEqual(queue.get(True, None), 2)
516 self.assertEqual(queue.get(True), 3)
517 self.assertEqual(queue.get(timeout=1), 4)
518 self.assertEqual(queue.get_nowait(), 5)
519
520 self.assertEqual(queue_empty(queue), True)
521
522 get = TimingWrapper(queue.get)
523 get_nowait = TimingWrapper(queue.get_nowait)
524
525 self.assertRaises(Queue.Empty, get, False)
526 self.assertTimingAlmostEqual(get.elapsed, 0)
527
528 self.assertRaises(Queue.Empty, get, False, None)
529 self.assertTimingAlmostEqual(get.elapsed, 0)
530
531 self.assertRaises(Queue.Empty, get_nowait)
532 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
533
534 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
535 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
536
537 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
538 self.assertTimingAlmostEqual(get.elapsed, 0)
539
540 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
541 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
542
543 proc.join()
544
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000545 @classmethod
546 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000547 for i in range(10, 20):
548 queue.put(i)
549 # note that at this point the items may only be buffered, so the
550 # process cannot shutdown until the feeder thread has finished
551 # pushing items onto the pipe.
552
553 def test_fork(self):
554 # Old versions of Queue would fail to create a new feeder
555 # thread for a forked process if the original process had its
556 # own feeder thread. This test checks that this no longer
557 # happens.
558
559 queue = self.Queue()
560
561 # put items on queue so that main process starts a feeder thread
562 for i in range(10):
563 queue.put(i)
564
565 # wait to make sure thread starts before we fork a new process
566 time.sleep(DELTA)
567
568 # fork process
569 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200570 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000571 p.start()
572
573 # check that all expected items are in the queue
574 for i in range(20):
575 self.assertEqual(queue.get(), i)
576 self.assertRaises(Queue.Empty, queue.get, False)
577
578 p.join()
579
580 def test_qsize(self):
581 q = self.Queue()
582 try:
583 self.assertEqual(q.qsize(), 0)
584 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600585 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000586 q.put(1)
587 self.assertEqual(q.qsize(), 1)
588 q.put(5)
589 self.assertEqual(q.qsize(), 2)
590 q.get()
591 self.assertEqual(q.qsize(), 1)
592 q.get()
593 self.assertEqual(q.qsize(), 0)
594
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000595 @classmethod
596 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000597 for obj in iter(q.get, None):
598 time.sleep(DELTA)
599 q.task_done()
600
601 def test_task_done(self):
602 queue = self.JoinableQueue()
603
604 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000605 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000606
607 workers = [self.Process(target=self._test_task_done, args=(queue,))
608 for i in xrange(4)]
609
610 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200611 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000612 p.start()
613
614 for i in xrange(10):
615 queue.put(i)
616
617 queue.join()
618
619 for p in workers:
620 queue.put(None)
621
622 for p in workers:
623 p.join()
624
625#
626#
627#
628
629class _TestLock(BaseTestCase):
630
631 def test_lock(self):
632 lock = self.Lock()
633 self.assertEqual(lock.acquire(), True)
634 self.assertEqual(lock.acquire(False), False)
635 self.assertEqual(lock.release(), None)
636 self.assertRaises((ValueError, threading.ThreadError), lock.release)
637
638 def test_rlock(self):
639 lock = self.RLock()
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.acquire(), True)
642 self.assertEqual(lock.acquire(), True)
643 self.assertEqual(lock.release(), None)
644 self.assertEqual(lock.release(), None)
645 self.assertEqual(lock.release(), None)
646 self.assertRaises((AssertionError, RuntimeError), lock.release)
647
Jesse Noller82eb5902009-03-30 23:29:31 +0000648 def test_lock_context(self):
649 with self.Lock():
650 pass
651
Benjamin Petersondfd79492008-06-13 19:13:39 +0000652
653class _TestSemaphore(BaseTestCase):
654
655 def _test_semaphore(self, sem):
656 self.assertReturnsIfImplemented(2, get_value, sem)
657 self.assertEqual(sem.acquire(), True)
658 self.assertReturnsIfImplemented(1, get_value, sem)
659 self.assertEqual(sem.acquire(), True)
660 self.assertReturnsIfImplemented(0, get_value, sem)
661 self.assertEqual(sem.acquire(False), False)
662 self.assertReturnsIfImplemented(0, get_value, sem)
663 self.assertEqual(sem.release(), None)
664 self.assertReturnsIfImplemented(1, get_value, sem)
665 self.assertEqual(sem.release(), None)
666 self.assertReturnsIfImplemented(2, get_value, sem)
667
668 def test_semaphore(self):
669 sem = self.Semaphore(2)
670 self._test_semaphore(sem)
671 self.assertEqual(sem.release(), None)
672 self.assertReturnsIfImplemented(3, get_value, sem)
673 self.assertEqual(sem.release(), None)
674 self.assertReturnsIfImplemented(4, get_value, sem)
675
676 def test_bounded_semaphore(self):
677 sem = self.BoundedSemaphore(2)
678 self._test_semaphore(sem)
679 # Currently fails on OS/X
680 #if HAVE_GETVALUE:
681 # self.assertRaises(ValueError, sem.release)
682 # self.assertReturnsIfImplemented(2, get_value, sem)
683
684 def test_timeout(self):
685 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600686 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000687
688 sem = self.Semaphore(0)
689 acquire = TimingWrapper(sem.acquire)
690
691 self.assertEqual(acquire(False), False)
692 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
693
694 self.assertEqual(acquire(False, None), False)
695 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
696
697 self.assertEqual(acquire(False, TIMEOUT1), False)
698 self.assertTimingAlmostEqual(acquire.elapsed, 0)
699
700 self.assertEqual(acquire(True, TIMEOUT2), False)
701 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
702
703 self.assertEqual(acquire(timeout=TIMEOUT3), False)
704 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
705
706
707class _TestCondition(BaseTestCase):
708
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000709 @classmethod
710 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000711 cond.acquire()
712 sleeping.release()
713 cond.wait(timeout)
714 woken.release()
715 cond.release()
716
717 def check_invariant(self, cond):
718 # this is only supposed to succeed when there are no sleepers
719 if self.TYPE == 'processes':
720 try:
721 sleepers = (cond._sleeping_count.get_value() -
722 cond._woken_count.get_value())
723 self.assertEqual(sleepers, 0)
724 self.assertEqual(cond._wait_semaphore.get_value(), 0)
725 except NotImplementedError:
726 pass
727
728 def test_notify(self):
729 cond = self.Condition()
730 sleeping = self.Semaphore(0)
731 woken = self.Semaphore(0)
732
733 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000734 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000735 p.start()
736
737 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000738 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000739 p.start()
740
741 # wait for both children to start sleeping
742 sleeping.acquire()
743 sleeping.acquire()
744
745 # check no process/thread has woken up
746 time.sleep(DELTA)
747 self.assertReturnsIfImplemented(0, get_value, woken)
748
749 # wake up one process/thread
750 cond.acquire()
751 cond.notify()
752 cond.release()
753
754 # check one process/thread has woken up
755 time.sleep(DELTA)
756 self.assertReturnsIfImplemented(1, get_value, woken)
757
758 # wake up another
759 cond.acquire()
760 cond.notify()
761 cond.release()
762
763 # check other has woken up
764 time.sleep(DELTA)
765 self.assertReturnsIfImplemented(2, get_value, woken)
766
767 # check state is not mucked up
768 self.check_invariant(cond)
769 p.join()
770
771 def test_notify_all(self):
772 cond = self.Condition()
773 sleeping = self.Semaphore(0)
774 woken = self.Semaphore(0)
775
776 # start some threads/processes which will timeout
777 for i in range(3):
778 p = self.Process(target=self.f,
779 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000780 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000781 p.start()
782
783 t = threading.Thread(target=self.f,
784 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000785 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786 t.start()
787
788 # wait for them all to sleep
789 for i in xrange(6):
790 sleeping.acquire()
791
792 # check they have all timed out
793 for i in xrange(6):
794 woken.acquire()
795 self.assertReturnsIfImplemented(0, get_value, woken)
796
797 # check state is not mucked up
798 self.check_invariant(cond)
799
800 # start some more threads/processes
801 for i in range(3):
802 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000803 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000804 p.start()
805
806 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000807 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000808 t.start()
809
810 # wait for them to all sleep
811 for i in xrange(6):
812 sleeping.acquire()
813
814 # check no process/thread has woken up
815 time.sleep(DELTA)
816 self.assertReturnsIfImplemented(0, get_value, woken)
817
818 # wake them all up
819 cond.acquire()
820 cond.notify_all()
821 cond.release()
822
823 # check they have all woken
824 time.sleep(DELTA)
825 self.assertReturnsIfImplemented(6, get_value, woken)
826
827 # check state is not mucked up
828 self.check_invariant(cond)
829
830 def test_timeout(self):
831 cond = self.Condition()
832 wait = TimingWrapper(cond.wait)
833 cond.acquire()
834 res = wait(TIMEOUT1)
835 cond.release()
836 self.assertEqual(res, None)
837 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
838
839
840class _TestEvent(BaseTestCase):
841
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000842 @classmethod
843 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000844 time.sleep(TIMEOUT2)
845 event.set()
846
847 def test_event(self):
848 event = self.Event()
849 wait = TimingWrapper(event.wait)
850
Ezio Melottic2077b02011-03-16 12:34:31 +0200851 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000852 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000853 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000854
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000855 # Removed, threading.Event.wait() will return the value of the __flag
856 # instead of None. API Shear with the semaphore backed mp.Event
857 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000859 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000860 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
861
862 event.set()
863
864 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000865 self.assertEqual(event.is_set(), True)
866 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000867 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000868 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000869 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
870 # self.assertEqual(event.is_set(), True)
871
872 event.clear()
873
874 #self.assertEqual(event.is_set(), False)
875
Jesus Cea6f6016b2011-09-09 20:26:57 +0200876 p = self.Process(target=self._test_event, args=(event,))
877 p.daemon = True
878 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000879 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000880
881#
882#
883#
884
885class _TestValue(BaseTestCase):
886
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000887 ALLOWED_TYPES = ('processes',)
888
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 codes_values = [
890 ('i', 4343, 24234),
891 ('d', 3.625, -4.25),
892 ('h', -232, 234),
893 ('c', latin('x'), latin('y'))
894 ]
895
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000896 def setUp(self):
897 if not HAS_SHAREDCTYPES:
898 self.skipTest("requires multiprocessing.sharedctypes")
899
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000900 @classmethod
901 def _test(cls, values):
902 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000903 sv.value = cv[2]
904
905
906 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 if raw:
908 values = [self.RawValue(code, value)
909 for code, value, _ in self.codes_values]
910 else:
911 values = [self.Value(code, value)
912 for code, value, _ in self.codes_values]
913
914 for sv, cv in zip(values, self.codes_values):
915 self.assertEqual(sv.value, cv[1])
916
917 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200918 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000919 proc.start()
920 proc.join()
921
922 for sv, cv in zip(values, self.codes_values):
923 self.assertEqual(sv.value, cv[2])
924
925 def test_rawvalue(self):
926 self.test_value(raw=True)
927
928 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 val1 = self.Value('i', 5)
930 lock1 = val1.get_lock()
931 obj1 = val1.get_obj()
932
933 val2 = self.Value('i', 5, lock=None)
934 lock2 = val2.get_lock()
935 obj2 = val2.get_obj()
936
937 lock = self.Lock()
938 val3 = self.Value('i', 5, lock=lock)
939 lock3 = val3.get_lock()
940 obj3 = val3.get_obj()
941 self.assertEqual(lock, lock3)
942
Jesse Noller6ab22152009-01-18 02:45:38 +0000943 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 self.assertFalse(hasattr(arr4, 'get_lock'))
945 self.assertFalse(hasattr(arr4, 'get_obj'))
946
Jesse Noller6ab22152009-01-18 02:45:38 +0000947 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
948
949 arr5 = self.RawValue('i', 5)
950 self.assertFalse(hasattr(arr5, 'get_lock'))
951 self.assertFalse(hasattr(arr5, 'get_obj'))
952
Benjamin Petersondfd79492008-06-13 19:13:39 +0000953
954class _TestArray(BaseTestCase):
955
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000956 ALLOWED_TYPES = ('processes',)
957
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000958 @classmethod
959 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000960 for i in range(1, len(seq)):
961 seq[i] += seq[i-1]
962
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000963 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000964 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000965 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
966 if raw:
967 arr = self.RawArray('i', seq)
968 else:
969 arr = self.Array('i', seq)
970
971 self.assertEqual(len(arr), len(seq))
972 self.assertEqual(arr[3], seq[3])
973 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
974
975 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
976
977 self.assertEqual(list(arr[:]), seq)
978
979 self.f(seq)
980
981 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200982 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 p.start()
984 p.join()
985
986 self.assertEqual(list(arr[:]), seq)
987
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000988 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000989 def test_array_from_size(self):
990 size = 10
991 # Test for zeroing (see issue #11675).
992 # The repetition below strengthens the test by increasing the chances
993 # of previously allocated non-zero memory being used for the new array
994 # on the 2nd and 3rd loops.
995 for _ in range(3):
996 arr = self.Array('i', size)
997 self.assertEqual(len(arr), size)
998 self.assertEqual(list(arr), [0] * size)
999 arr[:] = range(10)
1000 self.assertEqual(list(arr), range(10))
1001 del arr
1002
1003 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001004 def test_rawarray(self):
1005 self.test_array(raw=True)
1006
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001007 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001008 def test_array_accepts_long(self):
1009 arr = self.Array('i', 10L)
1010 self.assertEqual(len(arr), 10)
1011 raw_arr = self.RawArray('i', 10L)
1012 self.assertEqual(len(raw_arr), 10)
1013
1014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001015 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001016 arr1 = self.Array('i', range(10))
1017 lock1 = arr1.get_lock()
1018 obj1 = arr1.get_obj()
1019
1020 arr2 = self.Array('i', range(10), lock=None)
1021 lock2 = arr2.get_lock()
1022 obj2 = arr2.get_obj()
1023
1024 lock = self.Lock()
1025 arr3 = self.Array('i', range(10), lock=lock)
1026 lock3 = arr3.get_lock()
1027 obj3 = arr3.get_obj()
1028 self.assertEqual(lock, lock3)
1029
Jesse Noller6ab22152009-01-18 02:45:38 +00001030 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001031 self.assertFalse(hasattr(arr4, 'get_lock'))
1032 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001033 self.assertRaises(AttributeError,
1034 self.Array, 'i', range(10), lock='notalock')
1035
1036 arr5 = self.RawArray('i', range(10))
1037 self.assertFalse(hasattr(arr5, 'get_lock'))
1038 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001039
1040#
1041#
1042#
1043
1044class _TestContainers(BaseTestCase):
1045
1046 ALLOWED_TYPES = ('manager',)
1047
1048 def test_list(self):
1049 a = self.list(range(10))
1050 self.assertEqual(a[:], range(10))
1051
1052 b = self.list()
1053 self.assertEqual(b[:], [])
1054
1055 b.extend(range(5))
1056 self.assertEqual(b[:], range(5))
1057
1058 self.assertEqual(b[2], 2)
1059 self.assertEqual(b[2:10], [2,3,4])
1060
1061 b *= 2
1062 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1063
1064 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1065
1066 self.assertEqual(a[:], range(10))
1067
1068 d = [a, b]
1069 e = self.list(d)
1070 self.assertEqual(
1071 e[:],
1072 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1073 )
1074
1075 f = self.list([a])
1076 a.append('hello')
1077 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1078
1079 def test_dict(self):
1080 d = self.dict()
1081 indices = range(65, 70)
1082 for i in indices:
1083 d[i] = chr(i)
1084 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1085 self.assertEqual(sorted(d.keys()), indices)
1086 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1087 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1088
1089 def test_namespace(self):
1090 n = self.Namespace()
1091 n.name = 'Bob'
1092 n.job = 'Builder'
1093 n._hidden = 'hidden'
1094 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1095 del n.job
1096 self.assertEqual(str(n), "Namespace(name='Bob')")
1097 self.assertTrue(hasattr(n, 'name'))
1098 self.assertTrue(not hasattr(n, 'job'))
1099
1100#
1101#
1102#
1103
1104def sqr(x, wait=0.0):
1105 time.sleep(wait)
1106 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001107class _TestPool(BaseTestCase):
1108
1109 def test_apply(self):
1110 papply = self.pool.apply
1111 self.assertEqual(papply(sqr, (5,)), sqr(5))
1112 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1113
1114 def test_map(self):
1115 pmap = self.pool.map
1116 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1117 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1118 map(sqr, range(100)))
1119
Richard Oudkerk21aad972013-10-28 23:02:22 +00001120 def test_map_unplicklable(self):
1121 # Issue #19425 -- failure to pickle should not cause a hang
1122 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001123 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001124 class A(object):
1125 def __reduce__(self):
1126 raise RuntimeError('cannot pickle')
1127 with self.assertRaises(RuntimeError):
1128 self.pool.map(sqr, [A()]*10)
1129
Jesse Noller7530e472009-07-16 14:23:04 +00001130 def test_map_chunksize(self):
1131 try:
1132 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1133 except multiprocessing.TimeoutError:
1134 self.fail("pool.map_async with chunksize stalled on null list")
1135
Benjamin Petersondfd79492008-06-13 19:13:39 +00001136 def test_async(self):
1137 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1138 get = TimingWrapper(res.get)
1139 self.assertEqual(get(), 49)
1140 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1141
1142 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001143 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001144 get = TimingWrapper(res.get)
1145 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1146 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1147
1148 def test_imap(self):
1149 it = self.pool.imap(sqr, range(10))
1150 self.assertEqual(list(it), map(sqr, range(10)))
1151
1152 it = self.pool.imap(sqr, range(10))
1153 for i in range(10):
1154 self.assertEqual(it.next(), i*i)
1155 self.assertRaises(StopIteration, it.next)
1156
1157 it = self.pool.imap(sqr, range(1000), chunksize=100)
1158 for i in range(1000):
1159 self.assertEqual(it.next(), i*i)
1160 self.assertRaises(StopIteration, it.next)
1161
1162 def test_imap_unordered(self):
1163 it = self.pool.imap_unordered(sqr, range(1000))
1164 self.assertEqual(sorted(it), map(sqr, range(1000)))
1165
1166 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1167 self.assertEqual(sorted(it), map(sqr, range(1000)))
1168
1169 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001170 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1171 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1172
Benjamin Petersondfd79492008-06-13 19:13:39 +00001173 p = multiprocessing.Pool(3)
1174 self.assertEqual(3, len(p._pool))
1175 p.close()
1176 p.join()
1177
1178 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001179 p = self.Pool(4)
1180 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001181 time.sleep, [0.1 for i in range(10000)], chunksize=1
1182 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001183 p.terminate()
1184 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001185 join()
1186 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001187
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001188 def test_empty_iterable(self):
1189 # See Issue 12157
1190 p = self.Pool(1)
1191
1192 self.assertEqual(p.map(sqr, []), [])
1193 self.assertEqual(list(p.imap(sqr, [])), [])
1194 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1195 self.assertEqual(p.map_async(sqr, []).get(), [])
1196
1197 p.close()
1198 p.join()
1199
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001200def unpickleable_result():
1201 return lambda: 42
1202
1203class _TestPoolWorkerErrors(BaseTestCase):
1204 ALLOWED_TYPES = ('processes', )
1205
1206 def test_unpickleable_result(self):
1207 from multiprocessing.pool import MaybeEncodingError
1208 p = multiprocessing.Pool(2)
1209
1210 # Make sure we don't lose pool processes because of encoding errors.
1211 for iteration in range(20):
1212 res = p.apply_async(unpickleable_result)
1213 self.assertRaises(MaybeEncodingError, res.get)
1214
1215 p.close()
1216 p.join()
1217
Jesse Noller654ade32010-01-27 03:05:57 +00001218class _TestPoolWorkerLifetime(BaseTestCase):
1219
1220 ALLOWED_TYPES = ('processes', )
1221 def test_pool_worker_lifetime(self):
1222 p = multiprocessing.Pool(3, maxtasksperchild=10)
1223 self.assertEqual(3, len(p._pool))
1224 origworkerpids = [w.pid for w in p._pool]
1225 # Run many tasks so each worker gets replaced (hopefully)
1226 results = []
1227 for i in range(100):
1228 results.append(p.apply_async(sqr, (i, )))
1229 # Fetch the results and verify we got the right answers,
1230 # also ensuring all the tasks have completed.
1231 for (j, res) in enumerate(results):
1232 self.assertEqual(res.get(), sqr(j))
1233 # Refill the pool
1234 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001235 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001236 # (countdown * DELTA = 5 seconds max startup process time)
1237 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001238 while countdown and not all(w.is_alive() for w in p._pool):
1239 countdown -= 1
1240 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001241 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001242 # All pids should be assigned. See issue #7805.
1243 self.assertNotIn(None, origworkerpids)
1244 self.assertNotIn(None, finalworkerpids)
1245 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001246 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1247 p.close()
1248 p.join()
1249
Charles-François Natali46f990e2011-10-24 18:43:51 +02001250 def test_pool_worker_lifetime_early_close(self):
1251 # Issue #10332: closing a pool whose workers have limited lifetimes
1252 # before all the tasks completed would make join() hang.
1253 p = multiprocessing.Pool(3, maxtasksperchild=1)
1254 results = []
1255 for i in range(6):
1256 results.append(p.apply_async(sqr, (i, 0.3)))
1257 p.close()
1258 p.join()
1259 # check the results
1260 for (j, res) in enumerate(results):
1261 self.assertEqual(res.get(), sqr(j))
1262
1263
Benjamin Petersondfd79492008-06-13 19:13:39 +00001264#
1265# Test that manager has expected number of shared objects left
1266#
1267
1268class _TestZZZNumberOfObjects(BaseTestCase):
1269 # Because test cases are sorted alphabetically, this one will get
1270 # run after all the other tests for the manager. It tests that
1271 # there have been no "reference leaks" for the manager's shared
1272 # objects. Note the comment in _TestPool.test_terminate().
1273 ALLOWED_TYPES = ('manager',)
1274
1275 def test_number_of_objects(self):
1276 EXPECTED_NUMBER = 1 # the pool object is still alive
1277 multiprocessing.active_children() # discard dead process objs
1278 gc.collect() # do garbage collection
1279 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001280 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001281 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001282 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001283 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001284
1285 self.assertEqual(refs, EXPECTED_NUMBER)
1286
1287#
1288# Test of creating a customized manager class
1289#
1290
1291from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1292
1293class FooBar(object):
1294 def f(self):
1295 return 'f()'
1296 def g(self):
1297 raise ValueError
1298 def _h(self):
1299 return '_h()'
1300
1301def baz():
1302 for i in xrange(10):
1303 yield i*i
1304
1305class IteratorProxy(BaseProxy):
1306 _exposed_ = ('next', '__next__')
1307 def __iter__(self):
1308 return self
1309 def next(self):
1310 return self._callmethod('next')
1311 def __next__(self):
1312 return self._callmethod('__next__')
1313
1314class MyManager(BaseManager):
1315 pass
1316
1317MyManager.register('Foo', callable=FooBar)
1318MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1319MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1320
1321
1322class _TestMyManager(BaseTestCase):
1323
1324 ALLOWED_TYPES = ('manager',)
1325
1326 def test_mymanager(self):
1327 manager = MyManager()
1328 manager.start()
1329
1330 foo = manager.Foo()
1331 bar = manager.Bar()
1332 baz = manager.baz()
1333
1334 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1335 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1336
1337 self.assertEqual(foo_methods, ['f', 'g'])
1338 self.assertEqual(bar_methods, ['f', '_h'])
1339
1340 self.assertEqual(foo.f(), 'f()')
1341 self.assertRaises(ValueError, foo.g)
1342 self.assertEqual(foo._callmethod('f'), 'f()')
1343 self.assertRaises(RemoteError, foo._callmethod, '_h')
1344
1345 self.assertEqual(bar.f(), 'f()')
1346 self.assertEqual(bar._h(), '_h()')
1347 self.assertEqual(bar._callmethod('f'), 'f()')
1348 self.assertEqual(bar._callmethod('_h'), '_h()')
1349
1350 self.assertEqual(list(baz), [i*i for i in range(10)])
1351
1352 manager.shutdown()
1353
1354#
1355# Test of connecting to a remote server and using xmlrpclib for serialization
1356#
1357
1358_queue = Queue.Queue()
1359def get_queue():
1360 return _queue
1361
1362class QueueManager(BaseManager):
1363 '''manager class used by server process'''
1364QueueManager.register('get_queue', callable=get_queue)
1365
1366class QueueManager2(BaseManager):
1367 '''manager class which specifies the same interface as QueueManager'''
1368QueueManager2.register('get_queue')
1369
1370
1371SERIALIZER = 'xmlrpclib'
1372
1373class _TestRemoteManager(BaseTestCase):
1374
1375 ALLOWED_TYPES = ('manager',)
1376
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001377 @classmethod
1378 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001379 manager = QueueManager2(
1380 address=address, authkey=authkey, serializer=SERIALIZER
1381 )
1382 manager.connect()
1383 queue = manager.get_queue()
1384 queue.put(('hello world', None, True, 2.25))
1385
1386 def test_remote(self):
1387 authkey = os.urandom(32)
1388
1389 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001390 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001391 )
1392 manager.start()
1393
1394 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001395 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001396 p.start()
1397
1398 manager2 = QueueManager2(
1399 address=manager.address, authkey=authkey, serializer=SERIALIZER
1400 )
1401 manager2.connect()
1402 queue = manager2.get_queue()
1403
1404 # Note that xmlrpclib will deserialize object as a list not a tuple
1405 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1406
1407 # Because we are using xmlrpclib for serialization instead of
1408 # pickle this will cause a serialization error.
1409 self.assertRaises(Exception, queue.put, time.sleep)
1410
1411 # Make queue finalizer run before the server is stopped
1412 del queue
1413 manager.shutdown()
1414
Jesse Noller459a6482009-03-30 15:50:42 +00001415class _TestManagerRestart(BaseTestCase):
1416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001417 @classmethod
1418 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001419 manager = QueueManager(
1420 address=address, authkey=authkey, serializer=SERIALIZER)
1421 manager.connect()
1422 queue = manager.get_queue()
1423 queue.put('hello world')
1424
1425 def test_rapid_restart(self):
1426 authkey = os.urandom(32)
1427 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001428 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001429 srvr = manager.get_server()
1430 addr = srvr.address
1431 # Close the connection.Listener socket which gets opened as a part
1432 # of manager.get_server(). It's not needed for the test.
1433 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001434 manager.start()
1435
1436 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001437 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001438 p.start()
1439 queue = manager.get_queue()
1440 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001441 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001442 manager.shutdown()
1443 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001444 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001445 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001446 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001447
Benjamin Petersondfd79492008-06-13 19:13:39 +00001448#
1449#
1450#
1451
1452SENTINEL = latin('')
1453
1454class _TestConnection(BaseTestCase):
1455
1456 ALLOWED_TYPES = ('processes', 'threads')
1457
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001458 @classmethod
1459 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001460 for msg in iter(conn.recv_bytes, SENTINEL):
1461 conn.send_bytes(msg)
1462 conn.close()
1463
1464 def test_connection(self):
1465 conn, child_conn = self.Pipe()
1466
1467 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001468 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001469 p.start()
1470
1471 seq = [1, 2.25, None]
1472 msg = latin('hello world')
1473 longmsg = msg * 10
1474 arr = array.array('i', range(4))
1475
1476 if self.TYPE == 'processes':
1477 self.assertEqual(type(conn.fileno()), int)
1478
1479 self.assertEqual(conn.send(seq), None)
1480 self.assertEqual(conn.recv(), seq)
1481
1482 self.assertEqual(conn.send_bytes(msg), None)
1483 self.assertEqual(conn.recv_bytes(), msg)
1484
1485 if self.TYPE == 'processes':
1486 buffer = array.array('i', [0]*10)
1487 expected = list(arr) + [0] * (10 - len(arr))
1488 self.assertEqual(conn.send_bytes(arr), None)
1489 self.assertEqual(conn.recv_bytes_into(buffer),
1490 len(arr) * buffer.itemsize)
1491 self.assertEqual(list(buffer), expected)
1492
1493 buffer = array.array('i', [0]*10)
1494 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1495 self.assertEqual(conn.send_bytes(arr), None)
1496 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1497 len(arr) * buffer.itemsize)
1498 self.assertEqual(list(buffer), expected)
1499
1500 buffer = bytearray(latin(' ' * 40))
1501 self.assertEqual(conn.send_bytes(longmsg), None)
1502 try:
1503 res = conn.recv_bytes_into(buffer)
1504 except multiprocessing.BufferTooShort, e:
1505 self.assertEqual(e.args, (longmsg,))
1506 else:
1507 self.fail('expected BufferTooShort, got %s' % res)
1508
1509 poll = TimingWrapper(conn.poll)
1510
1511 self.assertEqual(poll(), False)
1512 self.assertTimingAlmostEqual(poll.elapsed, 0)
1513
1514 self.assertEqual(poll(TIMEOUT1), False)
1515 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1516
1517 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001518 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001519
1520 self.assertEqual(poll(TIMEOUT1), True)
1521 self.assertTimingAlmostEqual(poll.elapsed, 0)
1522
1523 self.assertEqual(conn.recv(), None)
1524
1525 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1526 conn.send_bytes(really_big_msg)
1527 self.assertEqual(conn.recv_bytes(), really_big_msg)
1528
1529 conn.send_bytes(SENTINEL) # tell child to quit
1530 child_conn.close()
1531
1532 if self.TYPE == 'processes':
1533 self.assertEqual(conn.readable, True)
1534 self.assertEqual(conn.writable, True)
1535 self.assertRaises(EOFError, conn.recv)
1536 self.assertRaises(EOFError, conn.recv_bytes)
1537
1538 p.join()
1539
1540 def test_duplex_false(self):
1541 reader, writer = self.Pipe(duplex=False)
1542 self.assertEqual(writer.send(1), None)
1543 self.assertEqual(reader.recv(), 1)
1544 if self.TYPE == 'processes':
1545 self.assertEqual(reader.readable, True)
1546 self.assertEqual(reader.writable, False)
1547 self.assertEqual(writer.readable, False)
1548 self.assertEqual(writer.writable, True)
1549 self.assertRaises(IOError, reader.send, 2)
1550 self.assertRaises(IOError, writer.recv)
1551 self.assertRaises(IOError, writer.poll)
1552
1553 def test_spawn_close(self):
1554 # We test that a pipe connection can be closed by parent
1555 # process immediately after child is spawned. On Windows this
1556 # would have sometimes failed on old versions because
1557 # child_conn would be closed before the child got a chance to
1558 # duplicate it.
1559 conn, child_conn = self.Pipe()
1560
1561 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001562 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001563 p.start()
1564 child_conn.close() # this might complete before child initializes
1565
1566 msg = latin('hello')
1567 conn.send_bytes(msg)
1568 self.assertEqual(conn.recv_bytes(), msg)
1569
1570 conn.send_bytes(SENTINEL)
1571 conn.close()
1572 p.join()
1573
1574 def test_sendbytes(self):
1575 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001576 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001577
1578 msg = latin('abcdefghijklmnopqrstuvwxyz')
1579 a, b = self.Pipe()
1580
1581 a.send_bytes(msg)
1582 self.assertEqual(b.recv_bytes(), msg)
1583
1584 a.send_bytes(msg, 5)
1585 self.assertEqual(b.recv_bytes(), msg[5:])
1586
1587 a.send_bytes(msg, 7, 8)
1588 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1589
1590 a.send_bytes(msg, 26)
1591 self.assertEqual(b.recv_bytes(), latin(''))
1592
1593 a.send_bytes(msg, 26, 0)
1594 self.assertEqual(b.recv_bytes(), latin(''))
1595
1596 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1597
1598 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1599
1600 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1601
1602 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1603
1604 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1605
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001606 @classmethod
1607 def _is_fd_assigned(cls, fd):
1608 try:
1609 os.fstat(fd)
1610 except OSError as e:
1611 if e.errno == errno.EBADF:
1612 return False
1613 raise
1614 else:
1615 return True
1616
1617 @classmethod
1618 def _writefd(cls, conn, data, create_dummy_fds=False):
1619 if create_dummy_fds:
1620 for i in range(0, 256):
1621 if not cls._is_fd_assigned(i):
1622 os.dup2(conn.fileno(), i)
1623 fd = reduction.recv_handle(conn)
1624 if msvcrt:
1625 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1626 os.write(fd, data)
1627 os.close(fd)
1628
Charles-François Natalif8413b22011-09-21 18:44:49 +02001629 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001630 def test_fd_transfer(self):
1631 if self.TYPE != 'processes':
1632 self.skipTest("only makes sense with processes")
1633 conn, child_conn = self.Pipe(duplex=True)
1634
1635 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001636 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001637 p.start()
1638 with open(test_support.TESTFN, "wb") as f:
1639 fd = f.fileno()
1640 if msvcrt:
1641 fd = msvcrt.get_osfhandle(fd)
1642 reduction.send_handle(conn, fd, p.pid)
1643 p.join()
1644 with open(test_support.TESTFN, "rb") as f:
1645 self.assertEqual(f.read(), b"foo")
1646
Charles-François Natalif8413b22011-09-21 18:44:49 +02001647 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001648 @unittest.skipIf(sys.platform == "win32",
1649 "test semantics don't make sense on Windows")
1650 @unittest.skipIf(MAXFD <= 256,
1651 "largest assignable fd number is too small")
1652 @unittest.skipUnless(hasattr(os, "dup2"),
1653 "test needs os.dup2()")
1654 def test_large_fd_transfer(self):
1655 # With fd > 256 (issue #11657)
1656 if self.TYPE != 'processes':
1657 self.skipTest("only makes sense with processes")
1658 conn, child_conn = self.Pipe(duplex=True)
1659
1660 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001661 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001662 p.start()
1663 with open(test_support.TESTFN, "wb") as f:
1664 fd = f.fileno()
1665 for newfd in range(256, MAXFD):
1666 if not self._is_fd_assigned(newfd):
1667 break
1668 else:
1669 self.fail("could not find an unassigned large file descriptor")
1670 os.dup2(fd, newfd)
1671 try:
1672 reduction.send_handle(conn, newfd, p.pid)
1673 finally:
1674 os.close(newfd)
1675 p.join()
1676 with open(test_support.TESTFN, "rb") as f:
1677 self.assertEqual(f.read(), b"bar")
1678
Jesus Ceac23484b2011-09-21 03:47:39 +02001679 @classmethod
1680 def _send_data_without_fd(self, conn):
1681 os.write(conn.fileno(), b"\0")
1682
Charles-François Natalif8413b22011-09-21 18:44:49 +02001683 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001684 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1685 def test_missing_fd_transfer(self):
1686 # Check that exception is raised when received data is not
1687 # accompanied by a file descriptor in ancillary data.
1688 if self.TYPE != 'processes':
1689 self.skipTest("only makes sense with processes")
1690 conn, child_conn = self.Pipe(duplex=True)
1691
1692 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1693 p.daemon = True
1694 p.start()
1695 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1696 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001697
Benjamin Petersondfd79492008-06-13 19:13:39 +00001698class _TestListenerClient(BaseTestCase):
1699
1700 ALLOWED_TYPES = ('processes', 'threads')
1701
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001702 @classmethod
1703 def _test(cls, address):
1704 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001705 conn.send('hello')
1706 conn.close()
1707
1708 def test_listener_client(self):
1709 for family in self.connection.families:
1710 l = self.connection.Listener(family=family)
1711 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001712 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001713 p.start()
1714 conn = l.accept()
1715 self.assertEqual(conn.recv(), 'hello')
1716 p.join()
1717 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001718
1719 def test_issue14725(self):
1720 l = self.connection.Listener()
1721 p = self.Process(target=self._test, args=(l.address,))
1722 p.daemon = True
1723 p.start()
1724 time.sleep(1)
1725 # On Windows the client process should by now have connected,
1726 # written data and closed the pipe handle by now. This causes
1727 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1728 # 14725.
1729 conn = l.accept()
1730 self.assertEqual(conn.recv(), 'hello')
1731 conn.close()
1732 p.join()
1733 l.close()
1734
Benjamin Petersondfd79492008-06-13 19:13:39 +00001735#
1736# Test of sending connection and socket objects between processes
1737#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001738"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001739class _TestPicklingConnections(BaseTestCase):
1740
1741 ALLOWED_TYPES = ('processes',)
1742
1743 def _listener(self, conn, families):
1744 for fam in families:
1745 l = self.connection.Listener(family=fam)
1746 conn.send(l.address)
1747 new_conn = l.accept()
1748 conn.send(new_conn)
1749
1750 if self.TYPE == 'processes':
1751 l = socket.socket()
1752 l.bind(('localhost', 0))
1753 conn.send(l.getsockname())
1754 l.listen(1)
1755 new_conn, addr = l.accept()
1756 conn.send(new_conn)
1757
1758 conn.recv()
1759
1760 def _remote(self, conn):
1761 for (address, msg) in iter(conn.recv, None):
1762 client = self.connection.Client(address)
1763 client.send(msg.upper())
1764 client.close()
1765
1766 if self.TYPE == 'processes':
1767 address, msg = conn.recv()
1768 client = socket.socket()
1769 client.connect(address)
1770 client.sendall(msg.upper())
1771 client.close()
1772
1773 conn.close()
1774
1775 def test_pickling(self):
1776 try:
1777 multiprocessing.allow_connection_pickling()
1778 except ImportError:
1779 return
1780
1781 families = self.connection.families
1782
1783 lconn, lconn0 = self.Pipe()
1784 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001785 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001786 lp.start()
1787 lconn0.close()
1788
1789 rconn, rconn0 = self.Pipe()
1790 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001791 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001792 rp.start()
1793 rconn0.close()
1794
1795 for fam in families:
1796 msg = ('This connection uses family %s' % fam).encode('ascii')
1797 address = lconn.recv()
1798 rconn.send((address, msg))
1799 new_conn = lconn.recv()
1800 self.assertEqual(new_conn.recv(), msg.upper())
1801
1802 rconn.send(None)
1803
1804 if self.TYPE == 'processes':
1805 msg = latin('This connection uses a normal socket')
1806 address = lconn.recv()
1807 rconn.send((address, msg))
1808 if hasattr(socket, 'fromfd'):
1809 new_conn = lconn.recv()
1810 self.assertEqual(new_conn.recv(100), msg.upper())
1811 else:
1812 # XXX On Windows with Py2.6 need to backport fromfd()
1813 discard = lconn.recv_bytes()
1814
1815 lconn.send(None)
1816
1817 rconn.close()
1818 lconn.close()
1819
1820 lp.join()
1821 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001822"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001823#
1824#
1825#
1826
1827class _TestHeap(BaseTestCase):
1828
1829 ALLOWED_TYPES = ('processes',)
1830
1831 def test_heap(self):
1832 iterations = 5000
1833 maxblocks = 50
1834 blocks = []
1835
1836 # create and destroy lots of blocks of different sizes
1837 for i in xrange(iterations):
1838 size = int(random.lognormvariate(0, 1) * 1000)
1839 b = multiprocessing.heap.BufferWrapper(size)
1840 blocks.append(b)
1841 if len(blocks) > maxblocks:
1842 i = random.randrange(maxblocks)
1843 del blocks[i]
1844
1845 # get the heap object
1846 heap = multiprocessing.heap.BufferWrapper._heap
1847
1848 # verify the state of the heap
1849 all = []
1850 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001851 heap._lock.acquire()
1852 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001853 for L in heap._len_to_seq.values():
1854 for arena, start, stop in L:
1855 all.append((heap._arenas.index(arena), start, stop,
1856 stop-start, 'free'))
1857 for arena, start, stop in heap._allocated_blocks:
1858 all.append((heap._arenas.index(arena), start, stop,
1859 stop-start, 'occupied'))
1860 occupied += (stop-start)
1861
1862 all.sort()
1863
1864 for i in range(len(all)-1):
1865 (arena, start, stop) = all[i][:3]
1866 (narena, nstart, nstop) = all[i+1][:3]
1867 self.assertTrue((arena != narena and nstart == 0) or
1868 (stop == nstart))
1869
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001870 def test_free_from_gc(self):
1871 # Check that freeing of blocks by the garbage collector doesn't deadlock
1872 # (issue #12352).
1873 # Make sure the GC is enabled, and set lower collection thresholds to
1874 # make collections more frequent (and increase the probability of
1875 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001876 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001877 gc.enable()
1878 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001879 thresholds = gc.get_threshold()
1880 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001881 gc.set_threshold(10)
1882
1883 # perform numerous block allocations, with cyclic references to make
1884 # sure objects are collected asynchronously by the gc
1885 for i in range(5000):
1886 a = multiprocessing.heap.BufferWrapper(1)
1887 b = multiprocessing.heap.BufferWrapper(1)
1888 # circular references
1889 a.buddy = b
1890 b.buddy = a
1891
Benjamin Petersondfd79492008-06-13 19:13:39 +00001892#
1893#
1894#
1895
Benjamin Petersondfd79492008-06-13 19:13:39 +00001896class _Foo(Structure):
1897 _fields_ = [
1898 ('x', c_int),
1899 ('y', c_double)
1900 ]
1901
1902class _TestSharedCTypes(BaseTestCase):
1903
1904 ALLOWED_TYPES = ('processes',)
1905
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001906 def setUp(self):
1907 if not HAS_SHAREDCTYPES:
1908 self.skipTest("requires multiprocessing.sharedctypes")
1909
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001910 @classmethod
1911 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001912 x.value *= 2
1913 y.value *= 2
1914 foo.x *= 2
1915 foo.y *= 2
1916 string.value *= 2
1917 for i in range(len(arr)):
1918 arr[i] *= 2
1919
1920 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001921 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001922 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001923 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001924 arr = self.Array('d', range(10), lock=lock)
1925 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001926 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001927
1928 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001929 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001930 p.start()
1931 p.join()
1932
1933 self.assertEqual(x.value, 14)
1934 self.assertAlmostEqual(y.value, 2.0/3.0)
1935 self.assertEqual(foo.x, 6)
1936 self.assertAlmostEqual(foo.y, 4.0)
1937 for i in range(10):
1938 self.assertAlmostEqual(arr[i], i*2)
1939 self.assertEqual(string.value, latin('hellohello'))
1940
1941 def test_synchronize(self):
1942 self.test_sharedctypes(lock=True)
1943
1944 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001945 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001946 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001947 foo.x = 0
1948 foo.y = 0
1949 self.assertEqual(bar.x, 2)
1950 self.assertAlmostEqual(bar.y, 5.0)
1951
1952#
1953#
1954#
1955
1956class _TestFinalize(BaseTestCase):
1957
1958 ALLOWED_TYPES = ('processes',)
1959
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001960 @classmethod
1961 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001962 class Foo(object):
1963 pass
1964
1965 a = Foo()
1966 util.Finalize(a, conn.send, args=('a',))
1967 del a # triggers callback for a
1968
1969 b = Foo()
1970 close_b = util.Finalize(b, conn.send, args=('b',))
1971 close_b() # triggers callback for b
1972 close_b() # does nothing because callback has already been called
1973 del b # does nothing because callback has already been called
1974
1975 c = Foo()
1976 util.Finalize(c, conn.send, args=('c',))
1977
1978 d10 = Foo()
1979 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1980
1981 d01 = Foo()
1982 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1983 d02 = Foo()
1984 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1985 d03 = Foo()
1986 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1987
1988 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1989
1990 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1991
Ezio Melottic2077b02011-03-16 12:34:31 +02001992 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001993 # garbage collecting locals
1994 util._exit_function()
1995 conn.close()
1996 os._exit(0)
1997
1998 def test_finalize(self):
1999 conn, child_conn = self.Pipe()
2000
2001 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002002 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002003 p.start()
2004 p.join()
2005
2006 result = [obj for obj in iter(conn.recv, 'STOP')]
2007 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2008
2009#
2010# Test that from ... import * works for each module
2011#
2012
2013class _TestImportStar(BaseTestCase):
2014
2015 ALLOWED_TYPES = ('processes',)
2016
2017 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002018 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002019 'multiprocessing', 'multiprocessing.connection',
2020 'multiprocessing.heap', 'multiprocessing.managers',
2021 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002022 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002023 ]
2024
Charles-François Natalif8413b22011-09-21 18:44:49 +02002025 if HAS_REDUCTION:
2026 modules.append('multiprocessing.reduction')
2027
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002028 if c_int is not None:
2029 # This module requires _ctypes
2030 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002031
2032 for name in modules:
2033 __import__(name)
2034 mod = sys.modules[name]
2035
2036 for attr in getattr(mod, '__all__', ()):
2037 self.assertTrue(
2038 hasattr(mod, attr),
2039 '%r does not have attribute %r' % (mod, attr)
2040 )
2041
2042#
2043# Quick test that logging works -- does not test logging output
2044#
2045
2046class _TestLogging(BaseTestCase):
2047
2048 ALLOWED_TYPES = ('processes',)
2049
2050 def test_enable_logging(self):
2051 logger = multiprocessing.get_logger()
2052 logger.setLevel(util.SUBWARNING)
2053 self.assertTrue(logger is not None)
2054 logger.debug('this will not be printed')
2055 logger.info('nor will this')
2056 logger.setLevel(LOG_LEVEL)
2057
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002058 @classmethod
2059 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002060 logger = multiprocessing.get_logger()
2061 conn.send(logger.getEffectiveLevel())
2062
2063 def test_level(self):
2064 LEVEL1 = 32
2065 LEVEL2 = 37
2066
2067 logger = multiprocessing.get_logger()
2068 root_logger = logging.getLogger()
2069 root_level = root_logger.level
2070
2071 reader, writer = multiprocessing.Pipe(duplex=False)
2072
2073 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002074 p = self.Process(target=self._test_level, args=(writer,))
2075 p.daemon = True
2076 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002077 self.assertEqual(LEVEL1, reader.recv())
2078
2079 logger.setLevel(logging.NOTSET)
2080 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002081 p = self.Process(target=self._test_level, args=(writer,))
2082 p.daemon = True
2083 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002084 self.assertEqual(LEVEL2, reader.recv())
2085
2086 root_logger.setLevel(root_level)
2087 logger.setLevel(level=LOG_LEVEL)
2088
Jesse Noller814d02d2009-11-21 14:38:23 +00002089
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002090# class _TestLoggingProcessName(BaseTestCase):
2091#
2092# def handle(self, record):
2093# assert record.processName == multiprocessing.current_process().name
2094# self.__handled = True
2095#
2096# def test_logging(self):
2097# handler = logging.Handler()
2098# handler.handle = self.handle
2099# self.__handled = False
2100# # Bypass getLogger() and side-effects
2101# logger = logging.getLoggerClass()(
2102# 'multiprocessing.test.TestLoggingProcessName')
2103# logger.addHandler(handler)
2104# logger.propagate = False
2105#
2106# logger.warn('foo')
2107# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002108
Benjamin Petersondfd79492008-06-13 19:13:39 +00002109#
Richard Oudkerkba482642013-02-26 12:37:07 +00002110# Check that Process.join() retries if os.waitpid() fails with EINTR
2111#
2112
2113class _TestPollEintr(BaseTestCase):
2114
2115 ALLOWED_TYPES = ('processes',)
2116
2117 @classmethod
2118 def _killer(cls, pid):
2119 time.sleep(0.5)
2120 os.kill(pid, signal.SIGUSR1)
2121
2122 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2123 def test_poll_eintr(self):
2124 got_signal = [False]
2125 def record(*args):
2126 got_signal[0] = True
2127 pid = os.getpid()
2128 oldhandler = signal.signal(signal.SIGUSR1, record)
2129 try:
2130 killer = self.Process(target=self._killer, args=(pid,))
2131 killer.start()
2132 p = self.Process(target=time.sleep, args=(1,))
2133 p.start()
2134 p.join()
2135 self.assertTrue(got_signal[0])
2136 self.assertEqual(p.exitcode, 0)
2137 killer.join()
2138 finally:
2139 signal.signal(signal.SIGUSR1, oldhandler)
2140
2141#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002142# Test to verify handle verification, see issue 3321
2143#
2144
2145class TestInvalidHandle(unittest.TestCase):
2146
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002147 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002148 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002149 conn = _multiprocessing.Connection(44977608)
2150 self.assertRaises(IOError, conn.poll)
2151 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002152
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002153#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002154# Functions used to create test cases from the base ones in this module
2155#
2156
2157def get_attributes(Source, names):
2158 d = {}
2159 for name in names:
2160 obj = getattr(Source, name)
2161 if type(obj) == type(get_attributes):
2162 obj = staticmethod(obj)
2163 d[name] = obj
2164 return d
2165
2166def create_test_cases(Mixin, type):
2167 result = {}
2168 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002169 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002170
2171 for name in glob.keys():
2172 if name.startswith('_Test'):
2173 base = glob[name]
2174 if type in base.ALLOWED_TYPES:
2175 newname = 'With' + Type + name[1:]
2176 class Temp(base, unittest.TestCase, Mixin):
2177 pass
2178 result[newname] = Temp
2179 Temp.__name__ = newname
2180 Temp.__module__ = Mixin.__module__
2181 return result
2182
2183#
2184# Create test cases
2185#
2186
2187class ProcessesMixin(object):
2188 TYPE = 'processes'
2189 Process = multiprocessing.Process
2190 locals().update(get_attributes(multiprocessing, (
2191 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2192 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2193 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002194 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002195 )))
2196
2197testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2198globals().update(testcases_processes)
2199
2200
2201class ManagerMixin(object):
2202 TYPE = 'manager'
2203 Process = multiprocessing.Process
2204 manager = object.__new__(multiprocessing.managers.SyncManager)
2205 locals().update(get_attributes(manager, (
2206 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2207 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002208 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002209 )))
2210
2211testcases_manager = create_test_cases(ManagerMixin, type='manager')
2212globals().update(testcases_manager)
2213
2214
2215class ThreadsMixin(object):
2216 TYPE = 'threads'
2217 Process = multiprocessing.dummy.Process
2218 locals().update(get_attributes(multiprocessing.dummy, (
2219 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2220 'Condition', 'Event', 'Value', 'Array', 'current_process',
2221 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002222 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002223 )))
2224
2225testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2226globals().update(testcases_threads)
2227
Neal Norwitz0c519b32008-08-25 01:50:24 +00002228class OtherTest(unittest.TestCase):
2229 # TODO: add more tests for deliver/answer challenge.
2230 def test_deliver_challenge_auth_failure(self):
2231 class _FakeConnection(object):
2232 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002233 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002234 def send_bytes(self, data):
2235 pass
2236 self.assertRaises(multiprocessing.AuthenticationError,
2237 multiprocessing.connection.deliver_challenge,
2238 _FakeConnection(), b'abc')
2239
2240 def test_answer_challenge_auth_failure(self):
2241 class _FakeConnection(object):
2242 def __init__(self):
2243 self.count = 0
2244 def recv_bytes(self, size):
2245 self.count += 1
2246 if self.count == 1:
2247 return multiprocessing.connection.CHALLENGE
2248 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002249 return b'something bogus'
2250 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002251 def send_bytes(self, data):
2252 pass
2253 self.assertRaises(multiprocessing.AuthenticationError,
2254 multiprocessing.connection.answer_challenge,
2255 _FakeConnection(), b'abc')
2256
Jesse Noller7152f6d2009-04-02 05:17:26 +00002257#
2258# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2259#
2260
2261def initializer(ns):
2262 ns.test += 1
2263
2264class TestInitializers(unittest.TestCase):
2265 def setUp(self):
2266 self.mgr = multiprocessing.Manager()
2267 self.ns = self.mgr.Namespace()
2268 self.ns.test = 0
2269
2270 def tearDown(self):
2271 self.mgr.shutdown()
2272
2273 def test_manager_initializer(self):
2274 m = multiprocessing.managers.SyncManager()
2275 self.assertRaises(TypeError, m.start, 1)
2276 m.start(initializer, (self.ns,))
2277 self.assertEqual(self.ns.test, 1)
2278 m.shutdown()
2279
2280 def test_pool_initializer(self):
2281 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2282 p = multiprocessing.Pool(1, initializer, (self.ns,))
2283 p.close()
2284 p.join()
2285 self.assertEqual(self.ns.test, 1)
2286
Jesse Noller1b90efb2009-06-30 17:11:52 +00002287#
2288# Issue 5155, 5313, 5331: Test process in processes
2289# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2290#
2291
Richard Oudkerkc5496072013-09-29 17:10:40 +01002292def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002293 try:
2294 item = q.get(block=False)
2295 except Queue.Empty:
2296 pass
2297
Richard Oudkerkc5496072013-09-29 17:10:40 +01002298def _test_process(q):
2299 queue = multiprocessing.Queue()
2300 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2301 subProc.daemon = True
2302 subProc.start()
2303 subProc.join()
2304
Jesse Noller1b90efb2009-06-30 17:11:52 +00002305def _afunc(x):
2306 return x*x
2307
2308def pool_in_process():
2309 pool = multiprocessing.Pool(processes=4)
2310 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2311
2312class _file_like(object):
2313 def __init__(self, delegate):
2314 self._delegate = delegate
2315 self._pid = None
2316
2317 @property
2318 def cache(self):
2319 pid = os.getpid()
2320 # There are no race conditions since fork keeps only the running thread
2321 if pid != self._pid:
2322 self._pid = pid
2323 self._cache = []
2324 return self._cache
2325
2326 def write(self, data):
2327 self.cache.append(data)
2328
2329 def flush(self):
2330 self._delegate.write(''.join(self.cache))
2331 self._cache = []
2332
2333class TestStdinBadfiledescriptor(unittest.TestCase):
2334
2335 def test_queue_in_process(self):
2336 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002337 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002338 proc.start()
2339 proc.join()
2340
2341 def test_pool_in_process(self):
2342 p = multiprocessing.Process(target=pool_in_process)
2343 p.start()
2344 p.join()
2345
2346 def test_flushing(self):
2347 sio = StringIO()
2348 flike = _file_like(sio)
2349 flike.write('foo')
2350 proc = multiprocessing.Process(target=lambda: flike.flush())
2351 flike.flush()
2352 assert sio.getvalue() == 'foo'
2353
Richard Oudkerke4b99382012-07-27 14:05:46 +01002354#
2355# Test interaction with socket timeouts - see Issue #6056
2356#
2357
2358class TestTimeouts(unittest.TestCase):
2359 @classmethod
2360 def _test_timeout(cls, child, address):
2361 time.sleep(1)
2362 child.send(123)
2363 child.close()
2364 conn = multiprocessing.connection.Client(address)
2365 conn.send(456)
2366 conn.close()
2367
2368 def test_timeout(self):
2369 old_timeout = socket.getdefaulttimeout()
2370 try:
2371 socket.setdefaulttimeout(0.1)
2372 parent, child = multiprocessing.Pipe(duplex=True)
2373 l = multiprocessing.connection.Listener(family='AF_INET')
2374 p = multiprocessing.Process(target=self._test_timeout,
2375 args=(child, l.address))
2376 p.start()
2377 child.close()
2378 self.assertEqual(parent.recv(), 123)
2379 parent.close()
2380 conn = l.accept()
2381 self.assertEqual(conn.recv(), 456)
2382 conn.close()
2383 l.close()
2384 p.join(10)
2385 finally:
2386 socket.setdefaulttimeout(old_timeout)
2387
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002388#
2389# Test what happens with no "if __name__ == '__main__'"
2390#
2391
2392class TestNoForkBomb(unittest.TestCase):
2393 def test_noforkbomb(self):
2394 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2395 if WIN32:
2396 rc, out, err = test.script_helper.assert_python_failure(name)
2397 self.assertEqual('', out.decode('ascii'))
2398 self.assertIn('RuntimeError', err.decode('ascii'))
2399 else:
2400 rc, out, err = test.script_helper.assert_python_ok(name)
2401 self.assertEqual('123', out.decode('ascii').rstrip())
2402 self.assertEqual('', err.decode('ascii'))
2403
2404#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002405# Issue 12098: check sys.flags of child matches that for parent
2406#
2407
2408class TestFlags(unittest.TestCase):
2409 @classmethod
2410 def run_in_grandchild(cls, conn):
2411 conn.send(tuple(sys.flags))
2412
2413 @classmethod
2414 def run_in_child(cls):
2415 import json
2416 r, w = multiprocessing.Pipe(duplex=False)
2417 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2418 p.start()
2419 grandchild_flags = r.recv()
2420 p.join()
2421 r.close()
2422 w.close()
2423 flags = (tuple(sys.flags), grandchild_flags)
2424 print(json.dumps(flags))
2425
2426 def test_flags(self):
2427 import json, subprocess
2428 # start child process using unusual flags
2429 prog = ('from test.test_multiprocessing import TestFlags; ' +
2430 'TestFlags.run_in_child()')
2431 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002432 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002433 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2434 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002435
2436#
2437# Issue #17555: ForkAwareThreadLock
2438#
2439
2440class TestForkAwareThreadLock(unittest.TestCase):
2441 # We recurisvely start processes. Issue #17555 meant that the
2442 # after fork registry would get duplicate entries for the same
2443 # lock. The size of the registry at generation n was ~2**n.
2444
2445 @classmethod
2446 def child(cls, n, conn):
2447 if n > 1:
2448 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2449 p.start()
2450 p.join()
2451 else:
2452 conn.send(len(util._afterfork_registry))
2453 conn.close()
2454
2455 def test_lock(self):
2456 r, w = multiprocessing.Pipe(False)
2457 l = util.ForkAwareThreadLock()
2458 old_size = len(util._afterfork_registry)
2459 p = multiprocessing.Process(target=self.child, args=(5, w))
2460 p.start()
2461 new_size = r.recv()
2462 p.join()
2463 self.assertLessEqual(new_size, old_size)
2464
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002465#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002466# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2467#
2468
2469class TestIgnoreEINTR(unittest.TestCase):
2470
2471 @classmethod
2472 def _test_ignore(cls, conn):
2473 def handler(signum, frame):
2474 pass
2475 signal.signal(signal.SIGUSR1, handler)
2476 conn.send('ready')
2477 x = conn.recv()
2478 conn.send(x)
2479 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2480
2481 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2482 def test_ignore(self):
2483 conn, child_conn = multiprocessing.Pipe()
2484 try:
2485 p = multiprocessing.Process(target=self._test_ignore,
2486 args=(child_conn,))
2487 p.daemon = True
2488 p.start()
2489 child_conn.close()
2490 self.assertEqual(conn.recv(), 'ready')
2491 time.sleep(0.1)
2492 os.kill(p.pid, signal.SIGUSR1)
2493 time.sleep(0.1)
2494 conn.send(1234)
2495 self.assertEqual(conn.recv(), 1234)
2496 time.sleep(0.1)
2497 os.kill(p.pid, signal.SIGUSR1)
2498 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2499 time.sleep(0.1)
2500 p.join()
2501 finally:
2502 conn.close()
2503
2504 @classmethod
2505 def _test_ignore_listener(cls, conn):
2506 def handler(signum, frame):
2507 pass
2508 signal.signal(signal.SIGUSR1, handler)
2509 l = multiprocessing.connection.Listener()
2510 conn.send(l.address)
2511 a = l.accept()
2512 a.send('welcome')
2513
2514 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2515 def test_ignore_listener(self):
2516 conn, child_conn = multiprocessing.Pipe()
2517 try:
2518 p = multiprocessing.Process(target=self._test_ignore_listener,
2519 args=(child_conn,))
2520 p.daemon = True
2521 p.start()
2522 child_conn.close()
2523 address = conn.recv()
2524 time.sleep(0.1)
2525 os.kill(p.pid, signal.SIGUSR1)
2526 time.sleep(0.1)
2527 client = multiprocessing.connection.Client(address)
2528 self.assertEqual(client.recv(), 'welcome')
2529 p.join()
2530 finally:
2531 conn.close()
2532
2533#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002534#
2535#
2536
Jesse Noller1b90efb2009-06-30 17:11:52 +00002537testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002538 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002539 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002540
Benjamin Petersondfd79492008-06-13 19:13:39 +00002541#
2542#
2543#
2544
2545def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002546 if sys.platform.startswith("linux"):
2547 try:
2548 lock = multiprocessing.RLock()
2549 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002550 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002551
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002552 check_enough_semaphores()
2553
Benjamin Petersondfd79492008-06-13 19:13:39 +00002554 if run is None:
2555 from test.test_support import run_unittest as run
2556
2557 util.get_temp_dir() # creates temp directory for use by all processes
2558
2559 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2560
Jesse Noller146b7ab2008-07-02 16:44:09 +00002561 ProcessesMixin.pool = multiprocessing.Pool(4)
2562 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2563 ManagerMixin.manager.__init__()
2564 ManagerMixin.manager.start()
2565 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002566
2567 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002568 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2569 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002570 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2571 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002572 )
2573
2574 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2575 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002576 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2577 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002578 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002579 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002580 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002581 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2582 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2583 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002584 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002585
Jesse Noller146b7ab2008-07-02 16:44:09 +00002586 ThreadsMixin.pool.terminate()
2587 ProcessesMixin.pool.terminate()
2588 ManagerMixin.pool.terminate()
2589 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002590
Jesse Noller146b7ab2008-07-02 16:44:09 +00002591 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002592
2593def main():
2594 test_main(unittest.TextTestRunner(verbosity=2).run)
2595
2596if __name__ == '__main__':
2597 main()