blob: 3a43753d01201e353255c5ce31df08edb68f1195 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010019import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000020from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000021from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000022_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020023# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000024# message: "No module named _multiprocessing". _multiprocessing is not compiled
25# without thread support.
26import threading
R. David Murray3db8a342009-03-30 23:05:48 +000027
Jesse Noller37040cd2008-09-30 00:15:45 +000028# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000029test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000030
Benjamin Petersondfd79492008-06-13 19:13:39 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000035import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000036
Charles-François Natalif8413b22011-09-21 18:44:49 +020037from multiprocessing import util
38
39try:
40 from multiprocessing import reduction
41 HAS_REDUCTION = True
42except ImportError:
43 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000044
Brian Curtina06e9b82010-10-07 02:27:41 +000045try:
46 from multiprocessing.sharedctypes import Value, copy
47 HAS_SHAREDCTYPES = True
48except ImportError:
49 HAS_SHAREDCTYPES = False
50
Antoine Pitroua1a8da82011-08-23 19:54:20 +020051try:
52 import msvcrt
53except ImportError:
54 msvcrt = None
55
Benjamin Petersondfd79492008-06-13 19:13:39 +000056#
57#
58#
59
Benjamin Petersone79edf52008-07-13 18:34:58 +000060latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000061
Benjamin Petersondfd79492008-06-13 19:13:39 +000062#
63# Constants
64#
65
66LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000067#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000068
69DELTA = 0.1
70CHECK_TIMINGS = False # making true makes tests take a lot longer
71 # and can sometimes cause some non-serious
72 # failures because some calls block a bit
73 # longer than expected
74if CHECK_TIMINGS:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
76else:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
78
79HAVE_GETVALUE = not getattr(_multiprocessing,
80 'HAVE_BROKEN_SEM_GETVALUE', False)
81
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000082WIN32 = (sys.platform == "win32")
83
Antoine Pitroua1a8da82011-08-23 19:54:20 +020084try:
85 MAXFD = os.sysconf("SC_OPEN_MAX")
86except:
87 MAXFD = 256
88
Benjamin Petersondfd79492008-06-13 19:13:39 +000089#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000090# Some tests require ctypes
91#
92
93try:
Nick Coghlan13623662010-04-10 14:24:36 +000094 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000095except ImportError:
96 Structure = object
97 c_int = c_double = None
98
Charles-François Natali6392d7f2011-11-22 18:35:18 +010099
100def check_enough_semaphores():
101 """Check that the system supports enough semaphores to run the test."""
102 # minimum number of semaphores available according to POSIX
103 nsems_min = 256
104 try:
105 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
106 except (AttributeError, ValueError):
107 # sysconf not available or setting not available
108 return
109 if nsems == -1 or nsems >= nsems_min:
110 return
111 raise unittest.SkipTest("The OS doesn't support enough semaphores "
112 "to run the test (required: %d)." % nsems_min)
113
114
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000115#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000116# Creates a wrapper for a function which records the time it takes to finish
117#
118
119class TimingWrapper(object):
120
121 def __init__(self, func):
122 self.func = func
123 self.elapsed = None
124
125 def __call__(self, *args, **kwds):
126 t = time.time()
127 try:
128 return self.func(*args, **kwds)
129 finally:
130 self.elapsed = time.time() - t
131
132#
133# Base class for test cases
134#
135
136class BaseTestCase(object):
137
138 ALLOWED_TYPES = ('processes', 'manager', 'threads')
139
140 def assertTimingAlmostEqual(self, a, b):
141 if CHECK_TIMINGS:
142 self.assertAlmostEqual(a, b, 1)
143
144 def assertReturnsIfImplemented(self, value, func, *args):
145 try:
146 res = func(*args)
147 except NotImplementedError:
148 pass
149 else:
150 return self.assertEqual(value, res)
151
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000152 # For the sanity of Windows users, rather than crashing or freezing in
153 # multiple ways.
154 def __reduce__(self, *args):
155 raise NotImplementedError("shouldn't try to pickle a test case")
156
157 __reduce_ex__ = __reduce__
158
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159#
160# Return the value of a semaphore
161#
162
163def get_value(self):
164 try:
165 return self.get_value()
166 except AttributeError:
167 try:
168 return self._Semaphore__value
169 except AttributeError:
170 try:
171 return self._value
172 except AttributeError:
173 raise NotImplementedError
174
175#
176# Testcases
177#
178
179class _TestProcess(BaseTestCase):
180
181 ALLOWED_TYPES = ('processes', 'threads')
182
183 def test_current(self):
184 if self.TYPE == 'threads':
185 return
186
187 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000188 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000189
190 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000192 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000193 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000194 self.assertEqual(current.ident, os.getpid())
195 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000197 @classmethod
198 def _test(cls, q, *args, **kwds):
199 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000200 q.put(args)
201 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000203 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000204 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205 q.put(current.pid)
206
207 def test_process(self):
208 q = self.Queue(1)
209 e = self.Event()
210 args = (q, 1, 2)
211 kwargs = {'hello':23, 'bye':2.54}
212 name = 'SomeProcess'
213 p = self.Process(
214 target=self._test, args=args, kwargs=kwargs, name=name
215 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000216 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000217 current = self.current_process()
218
219 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000220 self.assertEqual(p.authkey, current.authkey)
221 self.assertEqual(p.is_alive(), False)
222 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000223 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000225 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000226
227 p.start()
228
Ezio Melotti2623a372010-11-21 13:34:58 +0000229 self.assertEqual(p.exitcode, None)
230 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000231 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000232
Ezio Melotti2623a372010-11-21 13:34:58 +0000233 self.assertEqual(q.get(), args[1:])
234 self.assertEqual(q.get(), kwargs)
235 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000237 self.assertEqual(q.get(), current.authkey)
238 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000239
240 p.join()
241
Ezio Melotti2623a372010-11-21 13:34:58 +0000242 self.assertEqual(p.exitcode, 0)
243 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000244 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000245
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000246 @classmethod
247 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000248 time.sleep(1000)
249
250 def test_terminate(self):
251 if self.TYPE == 'threads':
252 return
253
254 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000255 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000256 p.start()
257
258 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000259 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000260 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
262 p.terminate()
263
264 join = TimingWrapper(p.join)
265 self.assertEqual(join(), None)
266 self.assertTimingAlmostEqual(join.elapsed, 0.0)
267
268 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270
271 p.join()
272
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000273 # XXX sometimes get p.exitcode == 0 on Windows ...
274 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000275
276 def test_cpu_count(self):
277 try:
278 cpus = multiprocessing.cpu_count()
279 except NotImplementedError:
280 cpus = 1
281 self.assertTrue(type(cpus) is int)
282 self.assertTrue(cpus >= 1)
283
284 def test_active_children(self):
285 self.assertEqual(type(self.active_children()), list)
286
287 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000288 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289
Jesus Cea6f6016b2011-09-09 20:26:57 +0200290 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000292 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000293
294 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000295 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000296
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000297 @classmethod
298 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000299 from multiprocessing import forking
300 wconn.send(id)
301 if len(id) < 2:
302 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000303 p = cls.Process(
304 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000305 )
306 p.start()
307 p.join()
308
309 def test_recursion(self):
310 rconn, wconn = self.Pipe(duplex=False)
311 self._test_recursion(wconn, [])
312
313 time.sleep(DELTA)
314 result = []
315 while rconn.poll():
316 result.append(rconn.recv())
317
318 expected = [
319 [],
320 [0],
321 [0, 0],
322 [0, 1],
323 [1],
324 [1, 0],
325 [1, 1]
326 ]
327 self.assertEqual(result, expected)
328
Richard Oudkerk2182e052012-06-06 19:01:14 +0100329 @classmethod
330 def _test_sys_exit(cls, reason, testfn):
331 sys.stderr = open(testfn, 'w')
332 sys.exit(reason)
333
334 def test_sys_exit(self):
335 # See Issue 13854
336 if self.TYPE == 'threads':
337 return
338
339 testfn = test_support.TESTFN
340 self.addCleanup(test_support.unlink, testfn)
341
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000342 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100343 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
344 p.daemon = True
345 p.start()
346 p.join(5)
347 self.assertEqual(p.exitcode, code)
348
349 with open(testfn, 'r') as f:
350 self.assertEqual(f.read().rstrip(), str(reason))
351
352 for reason in (True, False, 8):
353 p = self.Process(target=sys.exit, args=(reason,))
354 p.daemon = True
355 p.start()
356 p.join(5)
357 self.assertEqual(p.exitcode, reason)
358
Benjamin Petersondfd79492008-06-13 19:13:39 +0000359#
360#
361#
362
363class _UpperCaser(multiprocessing.Process):
364
365 def __init__(self):
366 multiprocessing.Process.__init__(self)
367 self.child_conn, self.parent_conn = multiprocessing.Pipe()
368
369 def run(self):
370 self.parent_conn.close()
371 for s in iter(self.child_conn.recv, None):
372 self.child_conn.send(s.upper())
373 self.child_conn.close()
374
375 def submit(self, s):
376 assert type(s) is str
377 self.parent_conn.send(s)
378 return self.parent_conn.recv()
379
380 def stop(self):
381 self.parent_conn.send(None)
382 self.parent_conn.close()
383 self.child_conn.close()
384
385class _TestSubclassingProcess(BaseTestCase):
386
387 ALLOWED_TYPES = ('processes',)
388
389 def test_subclassing(self):
390 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200391 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 uppercaser.start()
393 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
394 self.assertEqual(uppercaser.submit('world'), 'WORLD')
395 uppercaser.stop()
396 uppercaser.join()
397
398#
399#
400#
401
402def queue_empty(q):
403 if hasattr(q, 'empty'):
404 return q.empty()
405 else:
406 return q.qsize() == 0
407
408def queue_full(q, maxsize):
409 if hasattr(q, 'full'):
410 return q.full()
411 else:
412 return q.qsize() == maxsize
413
414
415class _TestQueue(BaseTestCase):
416
417
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000418 @classmethod
419 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 child_can_start.wait()
421 for i in range(6):
422 queue.get()
423 parent_can_continue.set()
424
425 def test_put(self):
426 MAXSIZE = 6
427 queue = self.Queue(maxsize=MAXSIZE)
428 child_can_start = self.Event()
429 parent_can_continue = self.Event()
430
431 proc = self.Process(
432 target=self._test_put,
433 args=(queue, child_can_start, parent_can_continue)
434 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000435 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000436 proc.start()
437
438 self.assertEqual(queue_empty(queue), True)
439 self.assertEqual(queue_full(queue, MAXSIZE), False)
440
441 queue.put(1)
442 queue.put(2, True)
443 queue.put(3, True, None)
444 queue.put(4, False)
445 queue.put(5, False, None)
446 queue.put_nowait(6)
447
448 # the values may be in buffer but not yet in pipe so sleep a bit
449 time.sleep(DELTA)
450
451 self.assertEqual(queue_empty(queue), False)
452 self.assertEqual(queue_full(queue, MAXSIZE), True)
453
454 put = TimingWrapper(queue.put)
455 put_nowait = TimingWrapper(queue.put_nowait)
456
457 self.assertRaises(Queue.Full, put, 7, False)
458 self.assertTimingAlmostEqual(put.elapsed, 0)
459
460 self.assertRaises(Queue.Full, put, 7, False, None)
461 self.assertTimingAlmostEqual(put.elapsed, 0)
462
463 self.assertRaises(Queue.Full, put_nowait, 7)
464 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
465
466 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
467 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
468
469 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
470 self.assertTimingAlmostEqual(put.elapsed, 0)
471
472 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
473 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
474
475 child_can_start.set()
476 parent_can_continue.wait()
477
478 self.assertEqual(queue_empty(queue), True)
479 self.assertEqual(queue_full(queue, MAXSIZE), False)
480
481 proc.join()
482
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000483 @classmethod
484 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000486 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000487 queue.put(2)
488 queue.put(3)
489 queue.put(4)
490 queue.put(5)
491 parent_can_continue.set()
492
493 def test_get(self):
494 queue = self.Queue()
495 child_can_start = self.Event()
496 parent_can_continue = self.Event()
497
498 proc = self.Process(
499 target=self._test_get,
500 args=(queue, child_can_start, parent_can_continue)
501 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000502 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000503 proc.start()
504
505 self.assertEqual(queue_empty(queue), True)
506
507 child_can_start.set()
508 parent_can_continue.wait()
509
510 time.sleep(DELTA)
511 self.assertEqual(queue_empty(queue), False)
512
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000513 # Hangs unexpectedly, remove for now
514 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000515 self.assertEqual(queue.get(True, None), 2)
516 self.assertEqual(queue.get(True), 3)
517 self.assertEqual(queue.get(timeout=1), 4)
518 self.assertEqual(queue.get_nowait(), 5)
519
520 self.assertEqual(queue_empty(queue), True)
521
522 get = TimingWrapper(queue.get)
523 get_nowait = TimingWrapper(queue.get_nowait)
524
525 self.assertRaises(Queue.Empty, get, False)
526 self.assertTimingAlmostEqual(get.elapsed, 0)
527
528 self.assertRaises(Queue.Empty, get, False, None)
529 self.assertTimingAlmostEqual(get.elapsed, 0)
530
531 self.assertRaises(Queue.Empty, get_nowait)
532 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
533
534 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
535 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
536
537 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
538 self.assertTimingAlmostEqual(get.elapsed, 0)
539
540 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
541 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
542
543 proc.join()
544
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000545 @classmethod
546 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000547 for i in range(10, 20):
548 queue.put(i)
549 # note that at this point the items may only be buffered, so the
550 # process cannot shutdown until the feeder thread has finished
551 # pushing items onto the pipe.
552
553 def test_fork(self):
554 # Old versions of Queue would fail to create a new feeder
555 # thread for a forked process if the original process had its
556 # own feeder thread. This test checks that this no longer
557 # happens.
558
559 queue = self.Queue()
560
561 # put items on queue so that main process starts a feeder thread
562 for i in range(10):
563 queue.put(i)
564
565 # wait to make sure thread starts before we fork a new process
566 time.sleep(DELTA)
567
568 # fork process
569 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200570 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000571 p.start()
572
573 # check that all expected items are in the queue
574 for i in range(20):
575 self.assertEqual(queue.get(), i)
576 self.assertRaises(Queue.Empty, queue.get, False)
577
578 p.join()
579
580 def test_qsize(self):
581 q = self.Queue()
582 try:
583 self.assertEqual(q.qsize(), 0)
584 except NotImplementedError:
585 return
586 q.put(1)
587 self.assertEqual(q.qsize(), 1)
588 q.put(5)
589 self.assertEqual(q.qsize(), 2)
590 q.get()
591 self.assertEqual(q.qsize(), 1)
592 q.get()
593 self.assertEqual(q.qsize(), 0)
594
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000595 @classmethod
596 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000597 for obj in iter(q.get, None):
598 time.sleep(DELTA)
599 q.task_done()
600
601 def test_task_done(self):
602 queue = self.JoinableQueue()
603
604 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000605 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000606
607 workers = [self.Process(target=self._test_task_done, args=(queue,))
608 for i in xrange(4)]
609
610 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200611 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000612 p.start()
613
614 for i in xrange(10):
615 queue.put(i)
616
617 queue.join()
618
619 for p in workers:
620 queue.put(None)
621
622 for p in workers:
623 p.join()
624
625#
626#
627#
628
629class _TestLock(BaseTestCase):
630
631 def test_lock(self):
632 lock = self.Lock()
633 self.assertEqual(lock.acquire(), True)
634 self.assertEqual(lock.acquire(False), False)
635 self.assertEqual(lock.release(), None)
636 self.assertRaises((ValueError, threading.ThreadError), lock.release)
637
638 def test_rlock(self):
639 lock = self.RLock()
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.acquire(), True)
642 self.assertEqual(lock.acquire(), True)
643 self.assertEqual(lock.release(), None)
644 self.assertEqual(lock.release(), None)
645 self.assertEqual(lock.release(), None)
646 self.assertRaises((AssertionError, RuntimeError), lock.release)
647
Jesse Noller82eb5902009-03-30 23:29:31 +0000648 def test_lock_context(self):
649 with self.Lock():
650 pass
651
Benjamin Petersondfd79492008-06-13 19:13:39 +0000652
653class _TestSemaphore(BaseTestCase):
654
655 def _test_semaphore(self, sem):
656 self.assertReturnsIfImplemented(2, get_value, sem)
657 self.assertEqual(sem.acquire(), True)
658 self.assertReturnsIfImplemented(1, get_value, sem)
659 self.assertEqual(sem.acquire(), True)
660 self.assertReturnsIfImplemented(0, get_value, sem)
661 self.assertEqual(sem.acquire(False), False)
662 self.assertReturnsIfImplemented(0, get_value, sem)
663 self.assertEqual(sem.release(), None)
664 self.assertReturnsIfImplemented(1, get_value, sem)
665 self.assertEqual(sem.release(), None)
666 self.assertReturnsIfImplemented(2, get_value, sem)
667
668 def test_semaphore(self):
669 sem = self.Semaphore(2)
670 self._test_semaphore(sem)
671 self.assertEqual(sem.release(), None)
672 self.assertReturnsIfImplemented(3, get_value, sem)
673 self.assertEqual(sem.release(), None)
674 self.assertReturnsIfImplemented(4, get_value, sem)
675
676 def test_bounded_semaphore(self):
677 sem = self.BoundedSemaphore(2)
678 self._test_semaphore(sem)
679 # Currently fails on OS/X
680 #if HAVE_GETVALUE:
681 # self.assertRaises(ValueError, sem.release)
682 # self.assertReturnsIfImplemented(2, get_value, sem)
683
684 def test_timeout(self):
685 if self.TYPE != 'processes':
686 return
687
688 sem = self.Semaphore(0)
689 acquire = TimingWrapper(sem.acquire)
690
691 self.assertEqual(acquire(False), False)
692 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
693
694 self.assertEqual(acquire(False, None), False)
695 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
696
697 self.assertEqual(acquire(False, TIMEOUT1), False)
698 self.assertTimingAlmostEqual(acquire.elapsed, 0)
699
700 self.assertEqual(acquire(True, TIMEOUT2), False)
701 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
702
703 self.assertEqual(acquire(timeout=TIMEOUT3), False)
704 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
705
706
707class _TestCondition(BaseTestCase):
708
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000709 @classmethod
710 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000711 cond.acquire()
712 sleeping.release()
713 cond.wait(timeout)
714 woken.release()
715 cond.release()
716
717 def check_invariant(self, cond):
718 # this is only supposed to succeed when there are no sleepers
719 if self.TYPE == 'processes':
720 try:
721 sleepers = (cond._sleeping_count.get_value() -
722 cond._woken_count.get_value())
723 self.assertEqual(sleepers, 0)
724 self.assertEqual(cond._wait_semaphore.get_value(), 0)
725 except NotImplementedError:
726 pass
727
728 def test_notify(self):
729 cond = self.Condition()
730 sleeping = self.Semaphore(0)
731 woken = self.Semaphore(0)
732
733 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000734 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000735 p.start()
736
737 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000738 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000739 p.start()
740
741 # wait for both children to start sleeping
742 sleeping.acquire()
743 sleeping.acquire()
744
745 # check no process/thread has woken up
746 time.sleep(DELTA)
747 self.assertReturnsIfImplemented(0, get_value, woken)
748
749 # wake up one process/thread
750 cond.acquire()
751 cond.notify()
752 cond.release()
753
754 # check one process/thread has woken up
755 time.sleep(DELTA)
756 self.assertReturnsIfImplemented(1, get_value, woken)
757
758 # wake up another
759 cond.acquire()
760 cond.notify()
761 cond.release()
762
763 # check other has woken up
764 time.sleep(DELTA)
765 self.assertReturnsIfImplemented(2, get_value, woken)
766
767 # check state is not mucked up
768 self.check_invariant(cond)
769 p.join()
770
771 def test_notify_all(self):
772 cond = self.Condition()
773 sleeping = self.Semaphore(0)
774 woken = self.Semaphore(0)
775
776 # start some threads/processes which will timeout
777 for i in range(3):
778 p = self.Process(target=self.f,
779 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000780 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000781 p.start()
782
783 t = threading.Thread(target=self.f,
784 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000785 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786 t.start()
787
788 # wait for them all to sleep
789 for i in xrange(6):
790 sleeping.acquire()
791
792 # check they have all timed out
793 for i in xrange(6):
794 woken.acquire()
795 self.assertReturnsIfImplemented(0, get_value, woken)
796
797 # check state is not mucked up
798 self.check_invariant(cond)
799
800 # start some more threads/processes
801 for i in range(3):
802 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000803 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000804 p.start()
805
806 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000807 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000808 t.start()
809
810 # wait for them to all sleep
811 for i in xrange(6):
812 sleeping.acquire()
813
814 # check no process/thread has woken up
815 time.sleep(DELTA)
816 self.assertReturnsIfImplemented(0, get_value, woken)
817
818 # wake them all up
819 cond.acquire()
820 cond.notify_all()
821 cond.release()
822
823 # check they have all woken
824 time.sleep(DELTA)
825 self.assertReturnsIfImplemented(6, get_value, woken)
826
827 # check state is not mucked up
828 self.check_invariant(cond)
829
830 def test_timeout(self):
831 cond = self.Condition()
832 wait = TimingWrapper(cond.wait)
833 cond.acquire()
834 res = wait(TIMEOUT1)
835 cond.release()
836 self.assertEqual(res, None)
837 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
838
839
840class _TestEvent(BaseTestCase):
841
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000842 @classmethod
843 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000844 time.sleep(TIMEOUT2)
845 event.set()
846
847 def test_event(self):
848 event = self.Event()
849 wait = TimingWrapper(event.wait)
850
Ezio Melottic2077b02011-03-16 12:34:31 +0200851 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000852 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000853 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000854
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000855 # Removed, threading.Event.wait() will return the value of the __flag
856 # instead of None. API Shear with the semaphore backed mp.Event
857 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000859 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000860 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
861
862 event.set()
863
864 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000865 self.assertEqual(event.is_set(), True)
866 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000867 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000868 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000869 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
870 # self.assertEqual(event.is_set(), True)
871
872 event.clear()
873
874 #self.assertEqual(event.is_set(), False)
875
Jesus Cea6f6016b2011-09-09 20:26:57 +0200876 p = self.Process(target=self._test_event, args=(event,))
877 p.daemon = True
878 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000879 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000880
881#
882#
883#
884
885class _TestValue(BaseTestCase):
886
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000887 ALLOWED_TYPES = ('processes',)
888
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 codes_values = [
890 ('i', 4343, 24234),
891 ('d', 3.625, -4.25),
892 ('h', -232, 234),
893 ('c', latin('x'), latin('y'))
894 ]
895
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000896 def setUp(self):
897 if not HAS_SHAREDCTYPES:
898 self.skipTest("requires multiprocessing.sharedctypes")
899
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000900 @classmethod
901 def _test(cls, values):
902 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000903 sv.value = cv[2]
904
905
906 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 if raw:
908 values = [self.RawValue(code, value)
909 for code, value, _ in self.codes_values]
910 else:
911 values = [self.Value(code, value)
912 for code, value, _ in self.codes_values]
913
914 for sv, cv in zip(values, self.codes_values):
915 self.assertEqual(sv.value, cv[1])
916
917 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200918 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000919 proc.start()
920 proc.join()
921
922 for sv, cv in zip(values, self.codes_values):
923 self.assertEqual(sv.value, cv[2])
924
925 def test_rawvalue(self):
926 self.test_value(raw=True)
927
928 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 val1 = self.Value('i', 5)
930 lock1 = val1.get_lock()
931 obj1 = val1.get_obj()
932
933 val2 = self.Value('i', 5, lock=None)
934 lock2 = val2.get_lock()
935 obj2 = val2.get_obj()
936
937 lock = self.Lock()
938 val3 = self.Value('i', 5, lock=lock)
939 lock3 = val3.get_lock()
940 obj3 = val3.get_obj()
941 self.assertEqual(lock, lock3)
942
Jesse Noller6ab22152009-01-18 02:45:38 +0000943 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 self.assertFalse(hasattr(arr4, 'get_lock'))
945 self.assertFalse(hasattr(arr4, 'get_obj'))
946
Jesse Noller6ab22152009-01-18 02:45:38 +0000947 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
948
949 arr5 = self.RawValue('i', 5)
950 self.assertFalse(hasattr(arr5, 'get_lock'))
951 self.assertFalse(hasattr(arr5, 'get_obj'))
952
Benjamin Petersondfd79492008-06-13 19:13:39 +0000953
954class _TestArray(BaseTestCase):
955
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000956 ALLOWED_TYPES = ('processes',)
957
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000958 @classmethod
959 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000960 for i in range(1, len(seq)):
961 seq[i] += seq[i-1]
962
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000963 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000964 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000965 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
966 if raw:
967 arr = self.RawArray('i', seq)
968 else:
969 arr = self.Array('i', seq)
970
971 self.assertEqual(len(arr), len(seq))
972 self.assertEqual(arr[3], seq[3])
973 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
974
975 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
976
977 self.assertEqual(list(arr[:]), seq)
978
979 self.f(seq)
980
981 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200982 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 p.start()
984 p.join()
985
986 self.assertEqual(list(arr[:]), seq)
987
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000988 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000989 def test_array_from_size(self):
990 size = 10
991 # Test for zeroing (see issue #11675).
992 # The repetition below strengthens the test by increasing the chances
993 # of previously allocated non-zero memory being used for the new array
994 # on the 2nd and 3rd loops.
995 for _ in range(3):
996 arr = self.Array('i', size)
997 self.assertEqual(len(arr), size)
998 self.assertEqual(list(arr), [0] * size)
999 arr[:] = range(10)
1000 self.assertEqual(list(arr), range(10))
1001 del arr
1002
1003 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001004 def test_rawarray(self):
1005 self.test_array(raw=True)
1006
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001007 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001008 def test_array_accepts_long(self):
1009 arr = self.Array('i', 10L)
1010 self.assertEqual(len(arr), 10)
1011 raw_arr = self.RawArray('i', 10L)
1012 self.assertEqual(len(raw_arr), 10)
1013
1014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001015 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001016 arr1 = self.Array('i', range(10))
1017 lock1 = arr1.get_lock()
1018 obj1 = arr1.get_obj()
1019
1020 arr2 = self.Array('i', range(10), lock=None)
1021 lock2 = arr2.get_lock()
1022 obj2 = arr2.get_obj()
1023
1024 lock = self.Lock()
1025 arr3 = self.Array('i', range(10), lock=lock)
1026 lock3 = arr3.get_lock()
1027 obj3 = arr3.get_obj()
1028 self.assertEqual(lock, lock3)
1029
Jesse Noller6ab22152009-01-18 02:45:38 +00001030 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001031 self.assertFalse(hasattr(arr4, 'get_lock'))
1032 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001033 self.assertRaises(AttributeError,
1034 self.Array, 'i', range(10), lock='notalock')
1035
1036 arr5 = self.RawArray('i', range(10))
1037 self.assertFalse(hasattr(arr5, 'get_lock'))
1038 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001039
1040#
1041#
1042#
1043
1044class _TestContainers(BaseTestCase):
1045
1046 ALLOWED_TYPES = ('manager',)
1047
1048 def test_list(self):
1049 a = self.list(range(10))
1050 self.assertEqual(a[:], range(10))
1051
1052 b = self.list()
1053 self.assertEqual(b[:], [])
1054
1055 b.extend(range(5))
1056 self.assertEqual(b[:], range(5))
1057
1058 self.assertEqual(b[2], 2)
1059 self.assertEqual(b[2:10], [2,3,4])
1060
1061 b *= 2
1062 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1063
1064 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1065
1066 self.assertEqual(a[:], range(10))
1067
1068 d = [a, b]
1069 e = self.list(d)
1070 self.assertEqual(
1071 e[:],
1072 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1073 )
1074
1075 f = self.list([a])
1076 a.append('hello')
1077 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1078
1079 def test_dict(self):
1080 d = self.dict()
1081 indices = range(65, 70)
1082 for i in indices:
1083 d[i] = chr(i)
1084 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1085 self.assertEqual(sorted(d.keys()), indices)
1086 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1087 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1088
1089 def test_namespace(self):
1090 n = self.Namespace()
1091 n.name = 'Bob'
1092 n.job = 'Builder'
1093 n._hidden = 'hidden'
1094 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1095 del n.job
1096 self.assertEqual(str(n), "Namespace(name='Bob')")
1097 self.assertTrue(hasattr(n, 'name'))
1098 self.assertTrue(not hasattr(n, 'job'))
1099
1100#
1101#
1102#
1103
1104def sqr(x, wait=0.0):
1105 time.sleep(wait)
1106 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001107class _TestPool(BaseTestCase):
1108
1109 def test_apply(self):
1110 papply = self.pool.apply
1111 self.assertEqual(papply(sqr, (5,)), sqr(5))
1112 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1113
1114 def test_map(self):
1115 pmap = self.pool.map
1116 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1117 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1118 map(sqr, range(100)))
1119
Richard Oudkerk21aad972013-10-28 23:02:22 +00001120 def test_map_unplicklable(self):
1121 # Issue #19425 -- failure to pickle should not cause a hang
1122 if self.TYPE == 'threads':
1123 return
1124 class A(object):
1125 def __reduce__(self):
1126 raise RuntimeError('cannot pickle')
1127 with self.assertRaises(RuntimeError):
1128 self.pool.map(sqr, [A()]*10)
1129
Jesse Noller7530e472009-07-16 14:23:04 +00001130 def test_map_chunksize(self):
1131 try:
1132 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1133 except multiprocessing.TimeoutError:
1134 self.fail("pool.map_async with chunksize stalled on null list")
1135
Benjamin Petersondfd79492008-06-13 19:13:39 +00001136 def test_async(self):
1137 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1138 get = TimingWrapper(res.get)
1139 self.assertEqual(get(), 49)
1140 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1141
1142 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001143 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001144 get = TimingWrapper(res.get)
1145 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1146 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1147
1148 def test_imap(self):
1149 it = self.pool.imap(sqr, range(10))
1150 self.assertEqual(list(it), map(sqr, range(10)))
1151
1152 it = self.pool.imap(sqr, range(10))
1153 for i in range(10):
1154 self.assertEqual(it.next(), i*i)
1155 self.assertRaises(StopIteration, it.next)
1156
1157 it = self.pool.imap(sqr, range(1000), chunksize=100)
1158 for i in range(1000):
1159 self.assertEqual(it.next(), i*i)
1160 self.assertRaises(StopIteration, it.next)
1161
1162 def test_imap_unordered(self):
1163 it = self.pool.imap_unordered(sqr, range(1000))
1164 self.assertEqual(sorted(it), map(sqr, range(1000)))
1165
1166 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1167 self.assertEqual(sorted(it), map(sqr, range(1000)))
1168
1169 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001170 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1171 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1172
Benjamin Petersondfd79492008-06-13 19:13:39 +00001173 p = multiprocessing.Pool(3)
1174 self.assertEqual(3, len(p._pool))
1175 p.close()
1176 p.join()
1177
1178 def test_terminate(self):
1179 if self.TYPE == 'manager':
1180 # On Unix a forked process increfs each shared object to
1181 # which its parent process held a reference. If the
1182 # forked process gets terminated then there is likely to
1183 # be a reference leak. So to prevent
1184 # _TestZZZNumberOfObjects from failing we skip this test
1185 # when using a manager.
1186 return
1187
1188 result = self.pool.map_async(
1189 time.sleep, [0.1 for i in range(10000)], chunksize=1
1190 )
1191 self.pool.terminate()
1192 join = TimingWrapper(self.pool.join)
1193 join()
1194 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001195
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001196 def test_empty_iterable(self):
1197 # See Issue 12157
1198 p = self.Pool(1)
1199
1200 self.assertEqual(p.map(sqr, []), [])
1201 self.assertEqual(list(p.imap(sqr, [])), [])
1202 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1203 self.assertEqual(p.map_async(sqr, []).get(), [])
1204
1205 p.close()
1206 p.join()
1207
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001208def unpickleable_result():
1209 return lambda: 42
1210
1211class _TestPoolWorkerErrors(BaseTestCase):
1212 ALLOWED_TYPES = ('processes', )
1213
1214 def test_unpickleable_result(self):
1215 from multiprocessing.pool import MaybeEncodingError
1216 p = multiprocessing.Pool(2)
1217
1218 # Make sure we don't lose pool processes because of encoding errors.
1219 for iteration in range(20):
1220 res = p.apply_async(unpickleable_result)
1221 self.assertRaises(MaybeEncodingError, res.get)
1222
1223 p.close()
1224 p.join()
1225
Jesse Noller654ade32010-01-27 03:05:57 +00001226class _TestPoolWorkerLifetime(BaseTestCase):
1227
1228 ALLOWED_TYPES = ('processes', )
1229 def test_pool_worker_lifetime(self):
1230 p = multiprocessing.Pool(3, maxtasksperchild=10)
1231 self.assertEqual(3, len(p._pool))
1232 origworkerpids = [w.pid for w in p._pool]
1233 # Run many tasks so each worker gets replaced (hopefully)
1234 results = []
1235 for i in range(100):
1236 results.append(p.apply_async(sqr, (i, )))
1237 # Fetch the results and verify we got the right answers,
1238 # also ensuring all the tasks have completed.
1239 for (j, res) in enumerate(results):
1240 self.assertEqual(res.get(), sqr(j))
1241 # Refill the pool
1242 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001243 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001244 # (countdown * DELTA = 5 seconds max startup process time)
1245 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001246 while countdown and not all(w.is_alive() for w in p._pool):
1247 countdown -= 1
1248 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001249 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001250 # All pids should be assigned. See issue #7805.
1251 self.assertNotIn(None, origworkerpids)
1252 self.assertNotIn(None, finalworkerpids)
1253 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001254 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1255 p.close()
1256 p.join()
1257
Charles-François Natali46f990e2011-10-24 18:43:51 +02001258 def test_pool_worker_lifetime_early_close(self):
1259 # Issue #10332: closing a pool whose workers have limited lifetimes
1260 # before all the tasks completed would make join() hang.
1261 p = multiprocessing.Pool(3, maxtasksperchild=1)
1262 results = []
1263 for i in range(6):
1264 results.append(p.apply_async(sqr, (i, 0.3)))
1265 p.close()
1266 p.join()
1267 # check the results
1268 for (j, res) in enumerate(results):
1269 self.assertEqual(res.get(), sqr(j))
1270
1271
Benjamin Petersondfd79492008-06-13 19:13:39 +00001272#
1273# Test that manager has expected number of shared objects left
1274#
1275
1276class _TestZZZNumberOfObjects(BaseTestCase):
1277 # Because test cases are sorted alphabetically, this one will get
1278 # run after all the other tests for the manager. It tests that
1279 # there have been no "reference leaks" for the manager's shared
1280 # objects. Note the comment in _TestPool.test_terminate().
1281 ALLOWED_TYPES = ('manager',)
1282
1283 def test_number_of_objects(self):
1284 EXPECTED_NUMBER = 1 # the pool object is still alive
1285 multiprocessing.active_children() # discard dead process objs
1286 gc.collect() # do garbage collection
1287 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001288 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001289 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001290 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001291 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001292
1293 self.assertEqual(refs, EXPECTED_NUMBER)
1294
1295#
1296# Test of creating a customized manager class
1297#
1298
1299from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1300
1301class FooBar(object):
1302 def f(self):
1303 return 'f()'
1304 def g(self):
1305 raise ValueError
1306 def _h(self):
1307 return '_h()'
1308
1309def baz():
1310 for i in xrange(10):
1311 yield i*i
1312
1313class IteratorProxy(BaseProxy):
1314 _exposed_ = ('next', '__next__')
1315 def __iter__(self):
1316 return self
1317 def next(self):
1318 return self._callmethod('next')
1319 def __next__(self):
1320 return self._callmethod('__next__')
1321
1322class MyManager(BaseManager):
1323 pass
1324
1325MyManager.register('Foo', callable=FooBar)
1326MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1327MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1328
1329
1330class _TestMyManager(BaseTestCase):
1331
1332 ALLOWED_TYPES = ('manager',)
1333
1334 def test_mymanager(self):
1335 manager = MyManager()
1336 manager.start()
1337
1338 foo = manager.Foo()
1339 bar = manager.Bar()
1340 baz = manager.baz()
1341
1342 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1343 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1344
1345 self.assertEqual(foo_methods, ['f', 'g'])
1346 self.assertEqual(bar_methods, ['f', '_h'])
1347
1348 self.assertEqual(foo.f(), 'f()')
1349 self.assertRaises(ValueError, foo.g)
1350 self.assertEqual(foo._callmethod('f'), 'f()')
1351 self.assertRaises(RemoteError, foo._callmethod, '_h')
1352
1353 self.assertEqual(bar.f(), 'f()')
1354 self.assertEqual(bar._h(), '_h()')
1355 self.assertEqual(bar._callmethod('f'), 'f()')
1356 self.assertEqual(bar._callmethod('_h'), '_h()')
1357
1358 self.assertEqual(list(baz), [i*i for i in range(10)])
1359
1360 manager.shutdown()
1361
1362#
1363# Test of connecting to a remote server and using xmlrpclib for serialization
1364#
1365
1366_queue = Queue.Queue()
1367def get_queue():
1368 return _queue
1369
1370class QueueManager(BaseManager):
1371 '''manager class used by server process'''
1372QueueManager.register('get_queue', callable=get_queue)
1373
1374class QueueManager2(BaseManager):
1375 '''manager class which specifies the same interface as QueueManager'''
1376QueueManager2.register('get_queue')
1377
1378
1379SERIALIZER = 'xmlrpclib'
1380
1381class _TestRemoteManager(BaseTestCase):
1382
1383 ALLOWED_TYPES = ('manager',)
1384
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001385 @classmethod
1386 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001387 manager = QueueManager2(
1388 address=address, authkey=authkey, serializer=SERIALIZER
1389 )
1390 manager.connect()
1391 queue = manager.get_queue()
1392 queue.put(('hello world', None, True, 2.25))
1393
1394 def test_remote(self):
1395 authkey = os.urandom(32)
1396
1397 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001398 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001399 )
1400 manager.start()
1401
1402 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001403 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001404 p.start()
1405
1406 manager2 = QueueManager2(
1407 address=manager.address, authkey=authkey, serializer=SERIALIZER
1408 )
1409 manager2.connect()
1410 queue = manager2.get_queue()
1411
1412 # Note that xmlrpclib will deserialize object as a list not a tuple
1413 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1414
1415 # Because we are using xmlrpclib for serialization instead of
1416 # pickle this will cause a serialization error.
1417 self.assertRaises(Exception, queue.put, time.sleep)
1418
1419 # Make queue finalizer run before the server is stopped
1420 del queue
1421 manager.shutdown()
1422
Jesse Noller459a6482009-03-30 15:50:42 +00001423class _TestManagerRestart(BaseTestCase):
1424
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001425 @classmethod
1426 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001427 manager = QueueManager(
1428 address=address, authkey=authkey, serializer=SERIALIZER)
1429 manager.connect()
1430 queue = manager.get_queue()
1431 queue.put('hello world')
1432
1433 def test_rapid_restart(self):
1434 authkey = os.urandom(32)
1435 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001436 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001437 srvr = manager.get_server()
1438 addr = srvr.address
1439 # Close the connection.Listener socket which gets opened as a part
1440 # of manager.get_server(). It's not needed for the test.
1441 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001442 manager.start()
1443
1444 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001445 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001446 p.start()
1447 queue = manager.get_queue()
1448 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001449 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001450 manager.shutdown()
1451 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001452 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001453 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001454 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001455
Benjamin Petersondfd79492008-06-13 19:13:39 +00001456#
1457#
1458#
1459
1460SENTINEL = latin('')
1461
1462class _TestConnection(BaseTestCase):
1463
1464 ALLOWED_TYPES = ('processes', 'threads')
1465
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001466 @classmethod
1467 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001468 for msg in iter(conn.recv_bytes, SENTINEL):
1469 conn.send_bytes(msg)
1470 conn.close()
1471
1472 def test_connection(self):
1473 conn, child_conn = self.Pipe()
1474
1475 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001476 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001477 p.start()
1478
1479 seq = [1, 2.25, None]
1480 msg = latin('hello world')
1481 longmsg = msg * 10
1482 arr = array.array('i', range(4))
1483
1484 if self.TYPE == 'processes':
1485 self.assertEqual(type(conn.fileno()), int)
1486
1487 self.assertEqual(conn.send(seq), None)
1488 self.assertEqual(conn.recv(), seq)
1489
1490 self.assertEqual(conn.send_bytes(msg), None)
1491 self.assertEqual(conn.recv_bytes(), msg)
1492
1493 if self.TYPE == 'processes':
1494 buffer = array.array('i', [0]*10)
1495 expected = list(arr) + [0] * (10 - len(arr))
1496 self.assertEqual(conn.send_bytes(arr), None)
1497 self.assertEqual(conn.recv_bytes_into(buffer),
1498 len(arr) * buffer.itemsize)
1499 self.assertEqual(list(buffer), expected)
1500
1501 buffer = array.array('i', [0]*10)
1502 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1503 self.assertEqual(conn.send_bytes(arr), None)
1504 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1505 len(arr) * buffer.itemsize)
1506 self.assertEqual(list(buffer), expected)
1507
1508 buffer = bytearray(latin(' ' * 40))
1509 self.assertEqual(conn.send_bytes(longmsg), None)
1510 try:
1511 res = conn.recv_bytes_into(buffer)
1512 except multiprocessing.BufferTooShort, e:
1513 self.assertEqual(e.args, (longmsg,))
1514 else:
1515 self.fail('expected BufferTooShort, got %s' % res)
1516
1517 poll = TimingWrapper(conn.poll)
1518
1519 self.assertEqual(poll(), False)
1520 self.assertTimingAlmostEqual(poll.elapsed, 0)
1521
1522 self.assertEqual(poll(TIMEOUT1), False)
1523 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1524
1525 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001526 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001527
1528 self.assertEqual(poll(TIMEOUT1), True)
1529 self.assertTimingAlmostEqual(poll.elapsed, 0)
1530
1531 self.assertEqual(conn.recv(), None)
1532
1533 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1534 conn.send_bytes(really_big_msg)
1535 self.assertEqual(conn.recv_bytes(), really_big_msg)
1536
1537 conn.send_bytes(SENTINEL) # tell child to quit
1538 child_conn.close()
1539
1540 if self.TYPE == 'processes':
1541 self.assertEqual(conn.readable, True)
1542 self.assertEqual(conn.writable, True)
1543 self.assertRaises(EOFError, conn.recv)
1544 self.assertRaises(EOFError, conn.recv_bytes)
1545
1546 p.join()
1547
1548 def test_duplex_false(self):
1549 reader, writer = self.Pipe(duplex=False)
1550 self.assertEqual(writer.send(1), None)
1551 self.assertEqual(reader.recv(), 1)
1552 if self.TYPE == 'processes':
1553 self.assertEqual(reader.readable, True)
1554 self.assertEqual(reader.writable, False)
1555 self.assertEqual(writer.readable, False)
1556 self.assertEqual(writer.writable, True)
1557 self.assertRaises(IOError, reader.send, 2)
1558 self.assertRaises(IOError, writer.recv)
1559 self.assertRaises(IOError, writer.poll)
1560
1561 def test_spawn_close(self):
1562 # We test that a pipe connection can be closed by parent
1563 # process immediately after child is spawned. On Windows this
1564 # would have sometimes failed on old versions because
1565 # child_conn would be closed before the child got a chance to
1566 # duplicate it.
1567 conn, child_conn = self.Pipe()
1568
1569 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001570 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001571 p.start()
1572 child_conn.close() # this might complete before child initializes
1573
1574 msg = latin('hello')
1575 conn.send_bytes(msg)
1576 self.assertEqual(conn.recv_bytes(), msg)
1577
1578 conn.send_bytes(SENTINEL)
1579 conn.close()
1580 p.join()
1581
1582 def test_sendbytes(self):
1583 if self.TYPE != 'processes':
1584 return
1585
1586 msg = latin('abcdefghijklmnopqrstuvwxyz')
1587 a, b = self.Pipe()
1588
1589 a.send_bytes(msg)
1590 self.assertEqual(b.recv_bytes(), msg)
1591
1592 a.send_bytes(msg, 5)
1593 self.assertEqual(b.recv_bytes(), msg[5:])
1594
1595 a.send_bytes(msg, 7, 8)
1596 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1597
1598 a.send_bytes(msg, 26)
1599 self.assertEqual(b.recv_bytes(), latin(''))
1600
1601 a.send_bytes(msg, 26, 0)
1602 self.assertEqual(b.recv_bytes(), latin(''))
1603
1604 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1605
1606 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1607
1608 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1609
1610 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1611
1612 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1613
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001614 @classmethod
1615 def _is_fd_assigned(cls, fd):
1616 try:
1617 os.fstat(fd)
1618 except OSError as e:
1619 if e.errno == errno.EBADF:
1620 return False
1621 raise
1622 else:
1623 return True
1624
1625 @classmethod
1626 def _writefd(cls, conn, data, create_dummy_fds=False):
1627 if create_dummy_fds:
1628 for i in range(0, 256):
1629 if not cls._is_fd_assigned(i):
1630 os.dup2(conn.fileno(), i)
1631 fd = reduction.recv_handle(conn)
1632 if msvcrt:
1633 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1634 os.write(fd, data)
1635 os.close(fd)
1636
Charles-François Natalif8413b22011-09-21 18:44:49 +02001637 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001638 def test_fd_transfer(self):
1639 if self.TYPE != 'processes':
1640 self.skipTest("only makes sense with processes")
1641 conn, child_conn = self.Pipe(duplex=True)
1642
1643 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001644 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001645 p.start()
1646 with open(test_support.TESTFN, "wb") as f:
1647 fd = f.fileno()
1648 if msvcrt:
1649 fd = msvcrt.get_osfhandle(fd)
1650 reduction.send_handle(conn, fd, p.pid)
1651 p.join()
1652 with open(test_support.TESTFN, "rb") as f:
1653 self.assertEqual(f.read(), b"foo")
1654
Charles-François Natalif8413b22011-09-21 18:44:49 +02001655 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001656 @unittest.skipIf(sys.platform == "win32",
1657 "test semantics don't make sense on Windows")
1658 @unittest.skipIf(MAXFD <= 256,
1659 "largest assignable fd number is too small")
1660 @unittest.skipUnless(hasattr(os, "dup2"),
1661 "test needs os.dup2()")
1662 def test_large_fd_transfer(self):
1663 # With fd > 256 (issue #11657)
1664 if self.TYPE != 'processes':
1665 self.skipTest("only makes sense with processes")
1666 conn, child_conn = self.Pipe(duplex=True)
1667
1668 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001669 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001670 p.start()
1671 with open(test_support.TESTFN, "wb") as f:
1672 fd = f.fileno()
1673 for newfd in range(256, MAXFD):
1674 if not self._is_fd_assigned(newfd):
1675 break
1676 else:
1677 self.fail("could not find an unassigned large file descriptor")
1678 os.dup2(fd, newfd)
1679 try:
1680 reduction.send_handle(conn, newfd, p.pid)
1681 finally:
1682 os.close(newfd)
1683 p.join()
1684 with open(test_support.TESTFN, "rb") as f:
1685 self.assertEqual(f.read(), b"bar")
1686
Jesus Ceac23484b2011-09-21 03:47:39 +02001687 @classmethod
1688 def _send_data_without_fd(self, conn):
1689 os.write(conn.fileno(), b"\0")
1690
Charles-François Natalif8413b22011-09-21 18:44:49 +02001691 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001692 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1693 def test_missing_fd_transfer(self):
1694 # Check that exception is raised when received data is not
1695 # accompanied by a file descriptor in ancillary data.
1696 if self.TYPE != 'processes':
1697 self.skipTest("only makes sense with processes")
1698 conn, child_conn = self.Pipe(duplex=True)
1699
1700 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1701 p.daemon = True
1702 p.start()
1703 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1704 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001705
Benjamin Petersondfd79492008-06-13 19:13:39 +00001706class _TestListenerClient(BaseTestCase):
1707
1708 ALLOWED_TYPES = ('processes', 'threads')
1709
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001710 @classmethod
1711 def _test(cls, address):
1712 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001713 conn.send('hello')
1714 conn.close()
1715
1716 def test_listener_client(self):
1717 for family in self.connection.families:
1718 l = self.connection.Listener(family=family)
1719 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001720 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001721 p.start()
1722 conn = l.accept()
1723 self.assertEqual(conn.recv(), 'hello')
1724 p.join()
1725 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001726
1727 def test_issue14725(self):
1728 l = self.connection.Listener()
1729 p = self.Process(target=self._test, args=(l.address,))
1730 p.daemon = True
1731 p.start()
1732 time.sleep(1)
1733 # On Windows the client process should by now have connected,
1734 # written data and closed the pipe handle by now. This causes
1735 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1736 # 14725.
1737 conn = l.accept()
1738 self.assertEqual(conn.recv(), 'hello')
1739 conn.close()
1740 p.join()
1741 l.close()
1742
Benjamin Petersondfd79492008-06-13 19:13:39 +00001743#
1744# Test of sending connection and socket objects between processes
1745#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001746"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001747class _TestPicklingConnections(BaseTestCase):
1748
1749 ALLOWED_TYPES = ('processes',)
1750
1751 def _listener(self, conn, families):
1752 for fam in families:
1753 l = self.connection.Listener(family=fam)
1754 conn.send(l.address)
1755 new_conn = l.accept()
1756 conn.send(new_conn)
1757
1758 if self.TYPE == 'processes':
1759 l = socket.socket()
1760 l.bind(('localhost', 0))
1761 conn.send(l.getsockname())
1762 l.listen(1)
1763 new_conn, addr = l.accept()
1764 conn.send(new_conn)
1765
1766 conn.recv()
1767
1768 def _remote(self, conn):
1769 for (address, msg) in iter(conn.recv, None):
1770 client = self.connection.Client(address)
1771 client.send(msg.upper())
1772 client.close()
1773
1774 if self.TYPE == 'processes':
1775 address, msg = conn.recv()
1776 client = socket.socket()
1777 client.connect(address)
1778 client.sendall(msg.upper())
1779 client.close()
1780
1781 conn.close()
1782
1783 def test_pickling(self):
1784 try:
1785 multiprocessing.allow_connection_pickling()
1786 except ImportError:
1787 return
1788
1789 families = self.connection.families
1790
1791 lconn, lconn0 = self.Pipe()
1792 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001793 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001794 lp.start()
1795 lconn0.close()
1796
1797 rconn, rconn0 = self.Pipe()
1798 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001799 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001800 rp.start()
1801 rconn0.close()
1802
1803 for fam in families:
1804 msg = ('This connection uses family %s' % fam).encode('ascii')
1805 address = lconn.recv()
1806 rconn.send((address, msg))
1807 new_conn = lconn.recv()
1808 self.assertEqual(new_conn.recv(), msg.upper())
1809
1810 rconn.send(None)
1811
1812 if self.TYPE == 'processes':
1813 msg = latin('This connection uses a normal socket')
1814 address = lconn.recv()
1815 rconn.send((address, msg))
1816 if hasattr(socket, 'fromfd'):
1817 new_conn = lconn.recv()
1818 self.assertEqual(new_conn.recv(100), msg.upper())
1819 else:
1820 # XXX On Windows with Py2.6 need to backport fromfd()
1821 discard = lconn.recv_bytes()
1822
1823 lconn.send(None)
1824
1825 rconn.close()
1826 lconn.close()
1827
1828 lp.join()
1829 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001830"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001831#
1832#
1833#
1834
1835class _TestHeap(BaseTestCase):
1836
1837 ALLOWED_TYPES = ('processes',)
1838
1839 def test_heap(self):
1840 iterations = 5000
1841 maxblocks = 50
1842 blocks = []
1843
1844 # create and destroy lots of blocks of different sizes
1845 for i in xrange(iterations):
1846 size = int(random.lognormvariate(0, 1) * 1000)
1847 b = multiprocessing.heap.BufferWrapper(size)
1848 blocks.append(b)
1849 if len(blocks) > maxblocks:
1850 i = random.randrange(maxblocks)
1851 del blocks[i]
1852
1853 # get the heap object
1854 heap = multiprocessing.heap.BufferWrapper._heap
1855
1856 # verify the state of the heap
1857 all = []
1858 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001859 heap._lock.acquire()
1860 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001861 for L in heap._len_to_seq.values():
1862 for arena, start, stop in L:
1863 all.append((heap._arenas.index(arena), start, stop,
1864 stop-start, 'free'))
1865 for arena, start, stop in heap._allocated_blocks:
1866 all.append((heap._arenas.index(arena), start, stop,
1867 stop-start, 'occupied'))
1868 occupied += (stop-start)
1869
1870 all.sort()
1871
1872 for i in range(len(all)-1):
1873 (arena, start, stop) = all[i][:3]
1874 (narena, nstart, nstop) = all[i+1][:3]
1875 self.assertTrue((arena != narena and nstart == 0) or
1876 (stop == nstart))
1877
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001878 def test_free_from_gc(self):
1879 # Check that freeing of blocks by the garbage collector doesn't deadlock
1880 # (issue #12352).
1881 # Make sure the GC is enabled, and set lower collection thresholds to
1882 # make collections more frequent (and increase the probability of
1883 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001884 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001885 gc.enable()
1886 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001887 thresholds = gc.get_threshold()
1888 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001889 gc.set_threshold(10)
1890
1891 # perform numerous block allocations, with cyclic references to make
1892 # sure objects are collected asynchronously by the gc
1893 for i in range(5000):
1894 a = multiprocessing.heap.BufferWrapper(1)
1895 b = multiprocessing.heap.BufferWrapper(1)
1896 # circular references
1897 a.buddy = b
1898 b.buddy = a
1899
Benjamin Petersondfd79492008-06-13 19:13:39 +00001900#
1901#
1902#
1903
Benjamin Petersondfd79492008-06-13 19:13:39 +00001904class _Foo(Structure):
1905 _fields_ = [
1906 ('x', c_int),
1907 ('y', c_double)
1908 ]
1909
1910class _TestSharedCTypes(BaseTestCase):
1911
1912 ALLOWED_TYPES = ('processes',)
1913
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001914 def setUp(self):
1915 if not HAS_SHAREDCTYPES:
1916 self.skipTest("requires multiprocessing.sharedctypes")
1917
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001918 @classmethod
1919 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001920 x.value *= 2
1921 y.value *= 2
1922 foo.x *= 2
1923 foo.y *= 2
1924 string.value *= 2
1925 for i in range(len(arr)):
1926 arr[i] *= 2
1927
1928 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001929 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001930 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001931 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001932 arr = self.Array('d', range(10), lock=lock)
1933 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001934 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001935
1936 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001937 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001938 p.start()
1939 p.join()
1940
1941 self.assertEqual(x.value, 14)
1942 self.assertAlmostEqual(y.value, 2.0/3.0)
1943 self.assertEqual(foo.x, 6)
1944 self.assertAlmostEqual(foo.y, 4.0)
1945 for i in range(10):
1946 self.assertAlmostEqual(arr[i], i*2)
1947 self.assertEqual(string.value, latin('hellohello'))
1948
1949 def test_synchronize(self):
1950 self.test_sharedctypes(lock=True)
1951
1952 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001953 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001954 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001955 foo.x = 0
1956 foo.y = 0
1957 self.assertEqual(bar.x, 2)
1958 self.assertAlmostEqual(bar.y, 5.0)
1959
1960#
1961#
1962#
1963
1964class _TestFinalize(BaseTestCase):
1965
1966 ALLOWED_TYPES = ('processes',)
1967
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001968 @classmethod
1969 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001970 class Foo(object):
1971 pass
1972
1973 a = Foo()
1974 util.Finalize(a, conn.send, args=('a',))
1975 del a # triggers callback for a
1976
1977 b = Foo()
1978 close_b = util.Finalize(b, conn.send, args=('b',))
1979 close_b() # triggers callback for b
1980 close_b() # does nothing because callback has already been called
1981 del b # does nothing because callback has already been called
1982
1983 c = Foo()
1984 util.Finalize(c, conn.send, args=('c',))
1985
1986 d10 = Foo()
1987 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1988
1989 d01 = Foo()
1990 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1991 d02 = Foo()
1992 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1993 d03 = Foo()
1994 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1995
1996 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1997
1998 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1999
Ezio Melottic2077b02011-03-16 12:34:31 +02002000 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002001 # garbage collecting locals
2002 util._exit_function()
2003 conn.close()
2004 os._exit(0)
2005
2006 def test_finalize(self):
2007 conn, child_conn = self.Pipe()
2008
2009 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002010 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002011 p.start()
2012 p.join()
2013
2014 result = [obj for obj in iter(conn.recv, 'STOP')]
2015 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2016
2017#
2018# Test that from ... import * works for each module
2019#
2020
2021class _TestImportStar(BaseTestCase):
2022
2023 ALLOWED_TYPES = ('processes',)
2024
2025 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002026 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002027 'multiprocessing', 'multiprocessing.connection',
2028 'multiprocessing.heap', 'multiprocessing.managers',
2029 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002030 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002031 ]
2032
Charles-François Natalif8413b22011-09-21 18:44:49 +02002033 if HAS_REDUCTION:
2034 modules.append('multiprocessing.reduction')
2035
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002036 if c_int is not None:
2037 # This module requires _ctypes
2038 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002039
2040 for name in modules:
2041 __import__(name)
2042 mod = sys.modules[name]
2043
2044 for attr in getattr(mod, '__all__', ()):
2045 self.assertTrue(
2046 hasattr(mod, attr),
2047 '%r does not have attribute %r' % (mod, attr)
2048 )
2049
2050#
2051# Quick test that logging works -- does not test logging output
2052#
2053
2054class _TestLogging(BaseTestCase):
2055
2056 ALLOWED_TYPES = ('processes',)
2057
2058 def test_enable_logging(self):
2059 logger = multiprocessing.get_logger()
2060 logger.setLevel(util.SUBWARNING)
2061 self.assertTrue(logger is not None)
2062 logger.debug('this will not be printed')
2063 logger.info('nor will this')
2064 logger.setLevel(LOG_LEVEL)
2065
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002066 @classmethod
2067 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002068 logger = multiprocessing.get_logger()
2069 conn.send(logger.getEffectiveLevel())
2070
2071 def test_level(self):
2072 LEVEL1 = 32
2073 LEVEL2 = 37
2074
2075 logger = multiprocessing.get_logger()
2076 root_logger = logging.getLogger()
2077 root_level = root_logger.level
2078
2079 reader, writer = multiprocessing.Pipe(duplex=False)
2080
2081 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002082 p = self.Process(target=self._test_level, args=(writer,))
2083 p.daemon = True
2084 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002085 self.assertEqual(LEVEL1, reader.recv())
2086
2087 logger.setLevel(logging.NOTSET)
2088 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002089 p = self.Process(target=self._test_level, args=(writer,))
2090 p.daemon = True
2091 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002092 self.assertEqual(LEVEL2, reader.recv())
2093
2094 root_logger.setLevel(root_level)
2095 logger.setLevel(level=LOG_LEVEL)
2096
Jesse Noller814d02d2009-11-21 14:38:23 +00002097
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002098# class _TestLoggingProcessName(BaseTestCase):
2099#
2100# def handle(self, record):
2101# assert record.processName == multiprocessing.current_process().name
2102# self.__handled = True
2103#
2104# def test_logging(self):
2105# handler = logging.Handler()
2106# handler.handle = self.handle
2107# self.__handled = False
2108# # Bypass getLogger() and side-effects
2109# logger = logging.getLoggerClass()(
2110# 'multiprocessing.test.TestLoggingProcessName')
2111# logger.addHandler(handler)
2112# logger.propagate = False
2113#
2114# logger.warn('foo')
2115# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002116
Benjamin Petersondfd79492008-06-13 19:13:39 +00002117#
Richard Oudkerkba482642013-02-26 12:37:07 +00002118# Check that Process.join() retries if os.waitpid() fails with EINTR
2119#
2120
2121class _TestPollEintr(BaseTestCase):
2122
2123 ALLOWED_TYPES = ('processes',)
2124
2125 @classmethod
2126 def _killer(cls, pid):
2127 time.sleep(0.5)
2128 os.kill(pid, signal.SIGUSR1)
2129
2130 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2131 def test_poll_eintr(self):
2132 got_signal = [False]
2133 def record(*args):
2134 got_signal[0] = True
2135 pid = os.getpid()
2136 oldhandler = signal.signal(signal.SIGUSR1, record)
2137 try:
2138 killer = self.Process(target=self._killer, args=(pid,))
2139 killer.start()
2140 p = self.Process(target=time.sleep, args=(1,))
2141 p.start()
2142 p.join()
2143 self.assertTrue(got_signal[0])
2144 self.assertEqual(p.exitcode, 0)
2145 killer.join()
2146 finally:
2147 signal.signal(signal.SIGUSR1, oldhandler)
2148
2149#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002150# Test to verify handle verification, see issue 3321
2151#
2152
2153class TestInvalidHandle(unittest.TestCase):
2154
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002155 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002156 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002157 conn = _multiprocessing.Connection(44977608)
2158 self.assertRaises(IOError, conn.poll)
2159 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002160
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002161#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002162# Functions used to create test cases from the base ones in this module
2163#
2164
2165def get_attributes(Source, names):
2166 d = {}
2167 for name in names:
2168 obj = getattr(Source, name)
2169 if type(obj) == type(get_attributes):
2170 obj = staticmethod(obj)
2171 d[name] = obj
2172 return d
2173
2174def create_test_cases(Mixin, type):
2175 result = {}
2176 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002177 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002178
2179 for name in glob.keys():
2180 if name.startswith('_Test'):
2181 base = glob[name]
2182 if type in base.ALLOWED_TYPES:
2183 newname = 'With' + Type + name[1:]
2184 class Temp(base, unittest.TestCase, Mixin):
2185 pass
2186 result[newname] = Temp
2187 Temp.__name__ = newname
2188 Temp.__module__ = Mixin.__module__
2189 return result
2190
2191#
2192# Create test cases
2193#
2194
2195class ProcessesMixin(object):
2196 TYPE = 'processes'
2197 Process = multiprocessing.Process
2198 locals().update(get_attributes(multiprocessing, (
2199 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2200 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2201 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002202 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002203 )))
2204
2205testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2206globals().update(testcases_processes)
2207
2208
2209class ManagerMixin(object):
2210 TYPE = 'manager'
2211 Process = multiprocessing.Process
2212 manager = object.__new__(multiprocessing.managers.SyncManager)
2213 locals().update(get_attributes(manager, (
2214 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2215 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002216 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002217 )))
2218
2219testcases_manager = create_test_cases(ManagerMixin, type='manager')
2220globals().update(testcases_manager)
2221
2222
2223class ThreadsMixin(object):
2224 TYPE = 'threads'
2225 Process = multiprocessing.dummy.Process
2226 locals().update(get_attributes(multiprocessing.dummy, (
2227 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2228 'Condition', 'Event', 'Value', 'Array', 'current_process',
2229 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002230 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002231 )))
2232
2233testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2234globals().update(testcases_threads)
2235
Neal Norwitz0c519b32008-08-25 01:50:24 +00002236class OtherTest(unittest.TestCase):
2237 # TODO: add more tests for deliver/answer challenge.
2238 def test_deliver_challenge_auth_failure(self):
2239 class _FakeConnection(object):
2240 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002241 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002242 def send_bytes(self, data):
2243 pass
2244 self.assertRaises(multiprocessing.AuthenticationError,
2245 multiprocessing.connection.deliver_challenge,
2246 _FakeConnection(), b'abc')
2247
2248 def test_answer_challenge_auth_failure(self):
2249 class _FakeConnection(object):
2250 def __init__(self):
2251 self.count = 0
2252 def recv_bytes(self, size):
2253 self.count += 1
2254 if self.count == 1:
2255 return multiprocessing.connection.CHALLENGE
2256 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002257 return b'something bogus'
2258 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002259 def send_bytes(self, data):
2260 pass
2261 self.assertRaises(multiprocessing.AuthenticationError,
2262 multiprocessing.connection.answer_challenge,
2263 _FakeConnection(), b'abc')
2264
Jesse Noller7152f6d2009-04-02 05:17:26 +00002265#
2266# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2267#
2268
2269def initializer(ns):
2270 ns.test += 1
2271
2272class TestInitializers(unittest.TestCase):
2273 def setUp(self):
2274 self.mgr = multiprocessing.Manager()
2275 self.ns = self.mgr.Namespace()
2276 self.ns.test = 0
2277
2278 def tearDown(self):
2279 self.mgr.shutdown()
2280
2281 def test_manager_initializer(self):
2282 m = multiprocessing.managers.SyncManager()
2283 self.assertRaises(TypeError, m.start, 1)
2284 m.start(initializer, (self.ns,))
2285 self.assertEqual(self.ns.test, 1)
2286 m.shutdown()
2287
2288 def test_pool_initializer(self):
2289 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2290 p = multiprocessing.Pool(1, initializer, (self.ns,))
2291 p.close()
2292 p.join()
2293 self.assertEqual(self.ns.test, 1)
2294
Jesse Noller1b90efb2009-06-30 17:11:52 +00002295#
2296# Issue 5155, 5313, 5331: Test process in processes
2297# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2298#
2299
Richard Oudkerkc5496072013-09-29 17:10:40 +01002300def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002301 try:
2302 item = q.get(block=False)
2303 except Queue.Empty:
2304 pass
2305
Richard Oudkerkc5496072013-09-29 17:10:40 +01002306def _test_process(q):
2307 queue = multiprocessing.Queue()
2308 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2309 subProc.daemon = True
2310 subProc.start()
2311 subProc.join()
2312
Jesse Noller1b90efb2009-06-30 17:11:52 +00002313def _afunc(x):
2314 return x*x
2315
2316def pool_in_process():
2317 pool = multiprocessing.Pool(processes=4)
2318 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2319
2320class _file_like(object):
2321 def __init__(self, delegate):
2322 self._delegate = delegate
2323 self._pid = None
2324
2325 @property
2326 def cache(self):
2327 pid = os.getpid()
2328 # There are no race conditions since fork keeps only the running thread
2329 if pid != self._pid:
2330 self._pid = pid
2331 self._cache = []
2332 return self._cache
2333
2334 def write(self, data):
2335 self.cache.append(data)
2336
2337 def flush(self):
2338 self._delegate.write(''.join(self.cache))
2339 self._cache = []
2340
2341class TestStdinBadfiledescriptor(unittest.TestCase):
2342
2343 def test_queue_in_process(self):
2344 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002345 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002346 proc.start()
2347 proc.join()
2348
2349 def test_pool_in_process(self):
2350 p = multiprocessing.Process(target=pool_in_process)
2351 p.start()
2352 p.join()
2353
2354 def test_flushing(self):
2355 sio = StringIO()
2356 flike = _file_like(sio)
2357 flike.write('foo')
2358 proc = multiprocessing.Process(target=lambda: flike.flush())
2359 flike.flush()
2360 assert sio.getvalue() == 'foo'
2361
Richard Oudkerke4b99382012-07-27 14:05:46 +01002362#
2363# Test interaction with socket timeouts - see Issue #6056
2364#
2365
2366class TestTimeouts(unittest.TestCase):
2367 @classmethod
2368 def _test_timeout(cls, child, address):
2369 time.sleep(1)
2370 child.send(123)
2371 child.close()
2372 conn = multiprocessing.connection.Client(address)
2373 conn.send(456)
2374 conn.close()
2375
2376 def test_timeout(self):
2377 old_timeout = socket.getdefaulttimeout()
2378 try:
2379 socket.setdefaulttimeout(0.1)
2380 parent, child = multiprocessing.Pipe(duplex=True)
2381 l = multiprocessing.connection.Listener(family='AF_INET')
2382 p = multiprocessing.Process(target=self._test_timeout,
2383 args=(child, l.address))
2384 p.start()
2385 child.close()
2386 self.assertEqual(parent.recv(), 123)
2387 parent.close()
2388 conn = l.accept()
2389 self.assertEqual(conn.recv(), 456)
2390 conn.close()
2391 l.close()
2392 p.join(10)
2393 finally:
2394 socket.setdefaulttimeout(old_timeout)
2395
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002396#
2397# Test what happens with no "if __name__ == '__main__'"
2398#
2399
2400class TestNoForkBomb(unittest.TestCase):
2401 def test_noforkbomb(self):
2402 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2403 if WIN32:
2404 rc, out, err = test.script_helper.assert_python_failure(name)
2405 self.assertEqual('', out.decode('ascii'))
2406 self.assertIn('RuntimeError', err.decode('ascii'))
2407 else:
2408 rc, out, err = test.script_helper.assert_python_ok(name)
2409 self.assertEqual('123', out.decode('ascii').rstrip())
2410 self.assertEqual('', err.decode('ascii'))
2411
2412#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002413# Issue 12098: check sys.flags of child matches that for parent
2414#
2415
2416class TestFlags(unittest.TestCase):
2417 @classmethod
2418 def run_in_grandchild(cls, conn):
2419 conn.send(tuple(sys.flags))
2420
2421 @classmethod
2422 def run_in_child(cls):
2423 import json
2424 r, w = multiprocessing.Pipe(duplex=False)
2425 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2426 p.start()
2427 grandchild_flags = r.recv()
2428 p.join()
2429 r.close()
2430 w.close()
2431 flags = (tuple(sys.flags), grandchild_flags)
2432 print(json.dumps(flags))
2433
2434 def test_flags(self):
2435 import json, subprocess
2436 # start child process using unusual flags
2437 prog = ('from test.test_multiprocessing import TestFlags; ' +
2438 'TestFlags.run_in_child()')
2439 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002440 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002441 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2442 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002443
2444#
2445# Issue #17555: ForkAwareThreadLock
2446#
2447
2448class TestForkAwareThreadLock(unittest.TestCase):
2449 # We recurisvely start processes. Issue #17555 meant that the
2450 # after fork registry would get duplicate entries for the same
2451 # lock. The size of the registry at generation n was ~2**n.
2452
2453 @classmethod
2454 def child(cls, n, conn):
2455 if n > 1:
2456 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2457 p.start()
2458 p.join()
2459 else:
2460 conn.send(len(util._afterfork_registry))
2461 conn.close()
2462
2463 def test_lock(self):
2464 r, w = multiprocessing.Pipe(False)
2465 l = util.ForkAwareThreadLock()
2466 old_size = len(util._afterfork_registry)
2467 p = multiprocessing.Process(target=self.child, args=(5, w))
2468 p.start()
2469 new_size = r.recv()
2470 p.join()
2471 self.assertLessEqual(new_size, old_size)
2472
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002473#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002474# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2475#
2476
2477class TestIgnoreEINTR(unittest.TestCase):
2478
2479 @classmethod
2480 def _test_ignore(cls, conn):
2481 def handler(signum, frame):
2482 pass
2483 signal.signal(signal.SIGUSR1, handler)
2484 conn.send('ready')
2485 x = conn.recv()
2486 conn.send(x)
2487 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2488
2489 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2490 def test_ignore(self):
2491 conn, child_conn = multiprocessing.Pipe()
2492 try:
2493 p = multiprocessing.Process(target=self._test_ignore,
2494 args=(child_conn,))
2495 p.daemon = True
2496 p.start()
2497 child_conn.close()
2498 self.assertEqual(conn.recv(), 'ready')
2499 time.sleep(0.1)
2500 os.kill(p.pid, signal.SIGUSR1)
2501 time.sleep(0.1)
2502 conn.send(1234)
2503 self.assertEqual(conn.recv(), 1234)
2504 time.sleep(0.1)
2505 os.kill(p.pid, signal.SIGUSR1)
2506 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2507 time.sleep(0.1)
2508 p.join()
2509 finally:
2510 conn.close()
2511
2512 @classmethod
2513 def _test_ignore_listener(cls, conn):
2514 def handler(signum, frame):
2515 pass
2516 signal.signal(signal.SIGUSR1, handler)
2517 l = multiprocessing.connection.Listener()
2518 conn.send(l.address)
2519 a = l.accept()
2520 a.send('welcome')
2521
2522 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2523 def test_ignore_listener(self):
2524 conn, child_conn = multiprocessing.Pipe()
2525 try:
2526 p = multiprocessing.Process(target=self._test_ignore_listener,
2527 args=(child_conn,))
2528 p.daemon = True
2529 p.start()
2530 child_conn.close()
2531 address = conn.recv()
2532 time.sleep(0.1)
2533 os.kill(p.pid, signal.SIGUSR1)
2534 time.sleep(0.1)
2535 client = multiprocessing.connection.Client(address)
2536 self.assertEqual(client.recv(), 'welcome')
2537 p.join()
2538 finally:
2539 conn.close()
2540
2541#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002542#
2543#
2544
Jesse Noller1b90efb2009-06-30 17:11:52 +00002545testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002546 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002547 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002548
Benjamin Petersondfd79492008-06-13 19:13:39 +00002549#
2550#
2551#
2552
2553def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002554 if sys.platform.startswith("linux"):
2555 try:
2556 lock = multiprocessing.RLock()
2557 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002558 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002559
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002560 check_enough_semaphores()
2561
Benjamin Petersondfd79492008-06-13 19:13:39 +00002562 if run is None:
2563 from test.test_support import run_unittest as run
2564
2565 util.get_temp_dir() # creates temp directory for use by all processes
2566
2567 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2568
Jesse Noller146b7ab2008-07-02 16:44:09 +00002569 ProcessesMixin.pool = multiprocessing.Pool(4)
2570 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2571 ManagerMixin.manager.__init__()
2572 ManagerMixin.manager.start()
2573 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002574
2575 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002576 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2577 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002578 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2579 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002580 )
2581
2582 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2583 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002584 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2585 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002586 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002587 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002588 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002589 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2590 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2591 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002592 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002593
Jesse Noller146b7ab2008-07-02 16:44:09 +00002594 ThreadsMixin.pool.terminate()
2595 ProcessesMixin.pool.terminate()
2596 ManagerMixin.pool.terminate()
2597 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002598
Jesse Noller146b7ab2008-07-02 16:44:09 +00002599 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002600
2601def main():
2602 test_main(unittest.TextTestRunner(verbosity=2).run)
2603
2604if __name__ == '__main__':
2605 main()