blob: 15870570db082918d3fd2ffe8ee357a3309e2048 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010019import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000020from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000021from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000022_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020023# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000024# message: "No module named _multiprocessing". _multiprocessing is not compiled
25# without thread support.
26import threading
R. David Murray3db8a342009-03-30 23:05:48 +000027
Jesse Noller37040cd2008-09-30 00:15:45 +000028# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000029test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000030
Benjamin Petersondfd79492008-06-13 19:13:39 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000035import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000036
Charles-François Natalif8413b22011-09-21 18:44:49 +020037from multiprocessing import util
38
39try:
40 from multiprocessing import reduction
41 HAS_REDUCTION = True
42except ImportError:
43 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000044
Brian Curtina06e9b82010-10-07 02:27:41 +000045try:
46 from multiprocessing.sharedctypes import Value, copy
47 HAS_SHAREDCTYPES = True
48except ImportError:
49 HAS_SHAREDCTYPES = False
50
Antoine Pitroua1a8da82011-08-23 19:54:20 +020051try:
52 import msvcrt
53except ImportError:
54 msvcrt = None
55
Benjamin Petersondfd79492008-06-13 19:13:39 +000056#
57#
58#
59
Benjamin Petersone79edf52008-07-13 18:34:58 +000060latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000061
Benjamin Petersondfd79492008-06-13 19:13:39 +000062#
63# Constants
64#
65
66LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000067#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000068
69DELTA = 0.1
70CHECK_TIMINGS = False # making true makes tests take a lot longer
71 # and can sometimes cause some non-serious
72 # failures because some calls block a bit
73 # longer than expected
74if CHECK_TIMINGS:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
76else:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
78
79HAVE_GETVALUE = not getattr(_multiprocessing,
80 'HAVE_BROKEN_SEM_GETVALUE', False)
81
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000082WIN32 = (sys.platform == "win32")
83
Antoine Pitroua1a8da82011-08-23 19:54:20 +020084try:
85 MAXFD = os.sysconf("SC_OPEN_MAX")
86except:
87 MAXFD = 256
88
Benjamin Petersondfd79492008-06-13 19:13:39 +000089#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000090# Some tests require ctypes
91#
92
93try:
Nick Coghlan13623662010-04-10 14:24:36 +000094 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000095except ImportError:
96 Structure = object
97 c_int = c_double = None
98
Charles-François Natali6392d7f2011-11-22 18:35:18 +010099
100def check_enough_semaphores():
101 """Check that the system supports enough semaphores to run the test."""
102 # minimum number of semaphores available according to POSIX
103 nsems_min = 256
104 try:
105 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
106 except (AttributeError, ValueError):
107 # sysconf not available or setting not available
108 return
109 if nsems == -1 or nsems >= nsems_min:
110 return
111 raise unittest.SkipTest("The OS doesn't support enough semaphores "
112 "to run the test (required: %d)." % nsems_min)
113
114
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000115#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000116# Creates a wrapper for a function which records the time it takes to finish
117#
118
119class TimingWrapper(object):
120
121 def __init__(self, func):
122 self.func = func
123 self.elapsed = None
124
125 def __call__(self, *args, **kwds):
126 t = time.time()
127 try:
128 return self.func(*args, **kwds)
129 finally:
130 self.elapsed = time.time() - t
131
132#
133# Base class for test cases
134#
135
136class BaseTestCase(object):
137
138 ALLOWED_TYPES = ('processes', 'manager', 'threads')
139
140 def assertTimingAlmostEqual(self, a, b):
141 if CHECK_TIMINGS:
142 self.assertAlmostEqual(a, b, 1)
143
144 def assertReturnsIfImplemented(self, value, func, *args):
145 try:
146 res = func(*args)
147 except NotImplementedError:
148 pass
149 else:
150 return self.assertEqual(value, res)
151
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000152 # For the sanity of Windows users, rather than crashing or freezing in
153 # multiple ways.
154 def __reduce__(self, *args):
155 raise NotImplementedError("shouldn't try to pickle a test case")
156
157 __reduce_ex__ = __reduce__
158
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159#
160# Return the value of a semaphore
161#
162
163def get_value(self):
164 try:
165 return self.get_value()
166 except AttributeError:
167 try:
168 return self._Semaphore__value
169 except AttributeError:
170 try:
171 return self._value
172 except AttributeError:
173 raise NotImplementedError
174
175#
176# Testcases
177#
178
179class _TestProcess(BaseTestCase):
180
181 ALLOWED_TYPES = ('processes', 'threads')
182
183 def test_current(self):
184 if self.TYPE == 'threads':
185 return
186
187 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000188 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000189
190 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000192 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000193 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000194 self.assertEqual(current.ident, os.getpid())
195 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000197 @classmethod
198 def _test(cls, q, *args, **kwds):
199 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000200 q.put(args)
201 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000203 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000204 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205 q.put(current.pid)
206
207 def test_process(self):
208 q = self.Queue(1)
209 e = self.Event()
210 args = (q, 1, 2)
211 kwargs = {'hello':23, 'bye':2.54}
212 name = 'SomeProcess'
213 p = self.Process(
214 target=self._test, args=args, kwargs=kwargs, name=name
215 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000216 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000217 current = self.current_process()
218
219 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000220 self.assertEqual(p.authkey, current.authkey)
221 self.assertEqual(p.is_alive(), False)
222 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000223 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000225 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000226
227 p.start()
228
Ezio Melotti2623a372010-11-21 13:34:58 +0000229 self.assertEqual(p.exitcode, None)
230 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000231 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000232
Ezio Melotti2623a372010-11-21 13:34:58 +0000233 self.assertEqual(q.get(), args[1:])
234 self.assertEqual(q.get(), kwargs)
235 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000237 self.assertEqual(q.get(), current.authkey)
238 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000239
240 p.join()
241
Ezio Melotti2623a372010-11-21 13:34:58 +0000242 self.assertEqual(p.exitcode, 0)
243 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000244 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000245
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000246 @classmethod
247 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000248 time.sleep(1000)
249
250 def test_terminate(self):
251 if self.TYPE == 'threads':
252 return
253
254 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000255 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000256 p.start()
257
258 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000259 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000260 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
262 p.terminate()
263
264 join = TimingWrapper(p.join)
265 self.assertEqual(join(), None)
266 self.assertTimingAlmostEqual(join.elapsed, 0.0)
267
268 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270
271 p.join()
272
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000273 # XXX sometimes get p.exitcode == 0 on Windows ...
274 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000275
276 def test_cpu_count(self):
277 try:
278 cpus = multiprocessing.cpu_count()
279 except NotImplementedError:
280 cpus = 1
281 self.assertTrue(type(cpus) is int)
282 self.assertTrue(cpus >= 1)
283
284 def test_active_children(self):
285 self.assertEqual(type(self.active_children()), list)
286
287 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000288 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289
Jesus Cea6f6016b2011-09-09 20:26:57 +0200290 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000292 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000293
294 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000295 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000296
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000297 @classmethod
298 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000299 from multiprocessing import forking
300 wconn.send(id)
301 if len(id) < 2:
302 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000303 p = cls.Process(
304 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000305 )
306 p.start()
307 p.join()
308
309 def test_recursion(self):
310 rconn, wconn = self.Pipe(duplex=False)
311 self._test_recursion(wconn, [])
312
313 time.sleep(DELTA)
314 result = []
315 while rconn.poll():
316 result.append(rconn.recv())
317
318 expected = [
319 [],
320 [0],
321 [0, 0],
322 [0, 1],
323 [1],
324 [1, 0],
325 [1, 1]
326 ]
327 self.assertEqual(result, expected)
328
Richard Oudkerk2182e052012-06-06 19:01:14 +0100329 @classmethod
330 def _test_sys_exit(cls, reason, testfn):
331 sys.stderr = open(testfn, 'w')
332 sys.exit(reason)
333
334 def test_sys_exit(self):
335 # See Issue 13854
336 if self.TYPE == 'threads':
337 return
338
339 testfn = test_support.TESTFN
340 self.addCleanup(test_support.unlink, testfn)
341
342 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
343 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
344 p.daemon = True
345 p.start()
346 p.join(5)
347 self.assertEqual(p.exitcode, code)
348
349 with open(testfn, 'r') as f:
350 self.assertEqual(f.read().rstrip(), str(reason))
351
352 for reason in (True, False, 8):
353 p = self.Process(target=sys.exit, args=(reason,))
354 p.daemon = True
355 p.start()
356 p.join(5)
357 self.assertEqual(p.exitcode, reason)
358
Benjamin Petersondfd79492008-06-13 19:13:39 +0000359#
360#
361#
362
363class _UpperCaser(multiprocessing.Process):
364
365 def __init__(self):
366 multiprocessing.Process.__init__(self)
367 self.child_conn, self.parent_conn = multiprocessing.Pipe()
368
369 def run(self):
370 self.parent_conn.close()
371 for s in iter(self.child_conn.recv, None):
372 self.child_conn.send(s.upper())
373 self.child_conn.close()
374
375 def submit(self, s):
376 assert type(s) is str
377 self.parent_conn.send(s)
378 return self.parent_conn.recv()
379
380 def stop(self):
381 self.parent_conn.send(None)
382 self.parent_conn.close()
383 self.child_conn.close()
384
385class _TestSubclassingProcess(BaseTestCase):
386
387 ALLOWED_TYPES = ('processes',)
388
389 def test_subclassing(self):
390 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200391 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 uppercaser.start()
393 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
394 self.assertEqual(uppercaser.submit('world'), 'WORLD')
395 uppercaser.stop()
396 uppercaser.join()
397
398#
399#
400#
401
402def queue_empty(q):
403 if hasattr(q, 'empty'):
404 return q.empty()
405 else:
406 return q.qsize() == 0
407
408def queue_full(q, maxsize):
409 if hasattr(q, 'full'):
410 return q.full()
411 else:
412 return q.qsize() == maxsize
413
414
415class _TestQueue(BaseTestCase):
416
417
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000418 @classmethod
419 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 child_can_start.wait()
421 for i in range(6):
422 queue.get()
423 parent_can_continue.set()
424
425 def test_put(self):
426 MAXSIZE = 6
427 queue = self.Queue(maxsize=MAXSIZE)
428 child_can_start = self.Event()
429 parent_can_continue = self.Event()
430
431 proc = self.Process(
432 target=self._test_put,
433 args=(queue, child_can_start, parent_can_continue)
434 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000435 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000436 proc.start()
437
438 self.assertEqual(queue_empty(queue), True)
439 self.assertEqual(queue_full(queue, MAXSIZE), False)
440
441 queue.put(1)
442 queue.put(2, True)
443 queue.put(3, True, None)
444 queue.put(4, False)
445 queue.put(5, False, None)
446 queue.put_nowait(6)
447
448 # the values may be in buffer but not yet in pipe so sleep a bit
449 time.sleep(DELTA)
450
451 self.assertEqual(queue_empty(queue), False)
452 self.assertEqual(queue_full(queue, MAXSIZE), True)
453
454 put = TimingWrapper(queue.put)
455 put_nowait = TimingWrapper(queue.put_nowait)
456
457 self.assertRaises(Queue.Full, put, 7, False)
458 self.assertTimingAlmostEqual(put.elapsed, 0)
459
460 self.assertRaises(Queue.Full, put, 7, False, None)
461 self.assertTimingAlmostEqual(put.elapsed, 0)
462
463 self.assertRaises(Queue.Full, put_nowait, 7)
464 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
465
466 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
467 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
468
469 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
470 self.assertTimingAlmostEqual(put.elapsed, 0)
471
472 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
473 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
474
475 child_can_start.set()
476 parent_can_continue.wait()
477
478 self.assertEqual(queue_empty(queue), True)
479 self.assertEqual(queue_full(queue, MAXSIZE), False)
480
481 proc.join()
482
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000483 @classmethod
484 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000486 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000487 queue.put(2)
488 queue.put(3)
489 queue.put(4)
490 queue.put(5)
491 parent_can_continue.set()
492
493 def test_get(self):
494 queue = self.Queue()
495 child_can_start = self.Event()
496 parent_can_continue = self.Event()
497
498 proc = self.Process(
499 target=self._test_get,
500 args=(queue, child_can_start, parent_can_continue)
501 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000502 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000503 proc.start()
504
505 self.assertEqual(queue_empty(queue), True)
506
507 child_can_start.set()
508 parent_can_continue.wait()
509
510 time.sleep(DELTA)
511 self.assertEqual(queue_empty(queue), False)
512
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000513 # Hangs unexpectedly, remove for now
514 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000515 self.assertEqual(queue.get(True, None), 2)
516 self.assertEqual(queue.get(True), 3)
517 self.assertEqual(queue.get(timeout=1), 4)
518 self.assertEqual(queue.get_nowait(), 5)
519
520 self.assertEqual(queue_empty(queue), True)
521
522 get = TimingWrapper(queue.get)
523 get_nowait = TimingWrapper(queue.get_nowait)
524
525 self.assertRaises(Queue.Empty, get, False)
526 self.assertTimingAlmostEqual(get.elapsed, 0)
527
528 self.assertRaises(Queue.Empty, get, False, None)
529 self.assertTimingAlmostEqual(get.elapsed, 0)
530
531 self.assertRaises(Queue.Empty, get_nowait)
532 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
533
534 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
535 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
536
537 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
538 self.assertTimingAlmostEqual(get.elapsed, 0)
539
540 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
541 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
542
543 proc.join()
544
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000545 @classmethod
546 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000547 for i in range(10, 20):
548 queue.put(i)
549 # note that at this point the items may only be buffered, so the
550 # process cannot shutdown until the feeder thread has finished
551 # pushing items onto the pipe.
552
553 def test_fork(self):
554 # Old versions of Queue would fail to create a new feeder
555 # thread for a forked process if the original process had its
556 # own feeder thread. This test checks that this no longer
557 # happens.
558
559 queue = self.Queue()
560
561 # put items on queue so that main process starts a feeder thread
562 for i in range(10):
563 queue.put(i)
564
565 # wait to make sure thread starts before we fork a new process
566 time.sleep(DELTA)
567
568 # fork process
569 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200570 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000571 p.start()
572
573 # check that all expected items are in the queue
574 for i in range(20):
575 self.assertEqual(queue.get(), i)
576 self.assertRaises(Queue.Empty, queue.get, False)
577
578 p.join()
579
580 def test_qsize(self):
581 q = self.Queue()
582 try:
583 self.assertEqual(q.qsize(), 0)
584 except NotImplementedError:
585 return
586 q.put(1)
587 self.assertEqual(q.qsize(), 1)
588 q.put(5)
589 self.assertEqual(q.qsize(), 2)
590 q.get()
591 self.assertEqual(q.qsize(), 1)
592 q.get()
593 self.assertEqual(q.qsize(), 0)
594
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000595 @classmethod
596 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000597 for obj in iter(q.get, None):
598 time.sleep(DELTA)
599 q.task_done()
600
601 def test_task_done(self):
602 queue = self.JoinableQueue()
603
604 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000605 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000606
607 workers = [self.Process(target=self._test_task_done, args=(queue,))
608 for i in xrange(4)]
609
610 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200611 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000612 p.start()
613
614 for i in xrange(10):
615 queue.put(i)
616
617 queue.join()
618
619 for p in workers:
620 queue.put(None)
621
622 for p in workers:
623 p.join()
624
625#
626#
627#
628
629class _TestLock(BaseTestCase):
630
631 def test_lock(self):
632 lock = self.Lock()
633 self.assertEqual(lock.acquire(), True)
634 self.assertEqual(lock.acquire(False), False)
635 self.assertEqual(lock.release(), None)
636 self.assertRaises((ValueError, threading.ThreadError), lock.release)
637
638 def test_rlock(self):
639 lock = self.RLock()
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.acquire(), True)
642 self.assertEqual(lock.acquire(), True)
643 self.assertEqual(lock.release(), None)
644 self.assertEqual(lock.release(), None)
645 self.assertEqual(lock.release(), None)
646 self.assertRaises((AssertionError, RuntimeError), lock.release)
647
Jesse Noller82eb5902009-03-30 23:29:31 +0000648 def test_lock_context(self):
649 with self.Lock():
650 pass
651
Benjamin Petersondfd79492008-06-13 19:13:39 +0000652
653class _TestSemaphore(BaseTestCase):
654
655 def _test_semaphore(self, sem):
656 self.assertReturnsIfImplemented(2, get_value, sem)
657 self.assertEqual(sem.acquire(), True)
658 self.assertReturnsIfImplemented(1, get_value, sem)
659 self.assertEqual(sem.acquire(), True)
660 self.assertReturnsIfImplemented(0, get_value, sem)
661 self.assertEqual(sem.acquire(False), False)
662 self.assertReturnsIfImplemented(0, get_value, sem)
663 self.assertEqual(sem.release(), None)
664 self.assertReturnsIfImplemented(1, get_value, sem)
665 self.assertEqual(sem.release(), None)
666 self.assertReturnsIfImplemented(2, get_value, sem)
667
668 def test_semaphore(self):
669 sem = self.Semaphore(2)
670 self._test_semaphore(sem)
671 self.assertEqual(sem.release(), None)
672 self.assertReturnsIfImplemented(3, get_value, sem)
673 self.assertEqual(sem.release(), None)
674 self.assertReturnsIfImplemented(4, get_value, sem)
675
676 def test_bounded_semaphore(self):
677 sem = self.BoundedSemaphore(2)
678 self._test_semaphore(sem)
679 # Currently fails on OS/X
680 #if HAVE_GETVALUE:
681 # self.assertRaises(ValueError, sem.release)
682 # self.assertReturnsIfImplemented(2, get_value, sem)
683
684 def test_timeout(self):
685 if self.TYPE != 'processes':
686 return
687
688 sem = self.Semaphore(0)
689 acquire = TimingWrapper(sem.acquire)
690
691 self.assertEqual(acquire(False), False)
692 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
693
694 self.assertEqual(acquire(False, None), False)
695 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
696
697 self.assertEqual(acquire(False, TIMEOUT1), False)
698 self.assertTimingAlmostEqual(acquire.elapsed, 0)
699
700 self.assertEqual(acquire(True, TIMEOUT2), False)
701 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
702
703 self.assertEqual(acquire(timeout=TIMEOUT3), False)
704 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
705
706
707class _TestCondition(BaseTestCase):
708
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000709 @classmethod
710 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000711 cond.acquire()
712 sleeping.release()
713 cond.wait(timeout)
714 woken.release()
715 cond.release()
716
717 def check_invariant(self, cond):
718 # this is only supposed to succeed when there are no sleepers
719 if self.TYPE == 'processes':
720 try:
721 sleepers = (cond._sleeping_count.get_value() -
722 cond._woken_count.get_value())
723 self.assertEqual(sleepers, 0)
724 self.assertEqual(cond._wait_semaphore.get_value(), 0)
725 except NotImplementedError:
726 pass
727
728 def test_notify(self):
729 cond = self.Condition()
730 sleeping = self.Semaphore(0)
731 woken = self.Semaphore(0)
732
733 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000734 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000735 p.start()
736
737 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000738 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000739 p.start()
740
741 # wait for both children to start sleeping
742 sleeping.acquire()
743 sleeping.acquire()
744
745 # check no process/thread has woken up
746 time.sleep(DELTA)
747 self.assertReturnsIfImplemented(0, get_value, woken)
748
749 # wake up one process/thread
750 cond.acquire()
751 cond.notify()
752 cond.release()
753
754 # check one process/thread has woken up
755 time.sleep(DELTA)
756 self.assertReturnsIfImplemented(1, get_value, woken)
757
758 # wake up another
759 cond.acquire()
760 cond.notify()
761 cond.release()
762
763 # check other has woken up
764 time.sleep(DELTA)
765 self.assertReturnsIfImplemented(2, get_value, woken)
766
767 # check state is not mucked up
768 self.check_invariant(cond)
769 p.join()
770
771 def test_notify_all(self):
772 cond = self.Condition()
773 sleeping = self.Semaphore(0)
774 woken = self.Semaphore(0)
775
776 # start some threads/processes which will timeout
777 for i in range(3):
778 p = self.Process(target=self.f,
779 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000780 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000781 p.start()
782
783 t = threading.Thread(target=self.f,
784 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000785 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786 t.start()
787
788 # wait for them all to sleep
789 for i in xrange(6):
790 sleeping.acquire()
791
792 # check they have all timed out
793 for i in xrange(6):
794 woken.acquire()
795 self.assertReturnsIfImplemented(0, get_value, woken)
796
797 # check state is not mucked up
798 self.check_invariant(cond)
799
800 # start some more threads/processes
801 for i in range(3):
802 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000803 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000804 p.start()
805
806 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000807 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000808 t.start()
809
810 # wait for them to all sleep
811 for i in xrange(6):
812 sleeping.acquire()
813
814 # check no process/thread has woken up
815 time.sleep(DELTA)
816 self.assertReturnsIfImplemented(0, get_value, woken)
817
818 # wake them all up
819 cond.acquire()
820 cond.notify_all()
821 cond.release()
822
823 # check they have all woken
824 time.sleep(DELTA)
825 self.assertReturnsIfImplemented(6, get_value, woken)
826
827 # check state is not mucked up
828 self.check_invariant(cond)
829
830 def test_timeout(self):
831 cond = self.Condition()
832 wait = TimingWrapper(cond.wait)
833 cond.acquire()
834 res = wait(TIMEOUT1)
835 cond.release()
836 self.assertEqual(res, None)
837 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
838
839
840class _TestEvent(BaseTestCase):
841
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000842 @classmethod
843 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000844 time.sleep(TIMEOUT2)
845 event.set()
846
847 def test_event(self):
848 event = self.Event()
849 wait = TimingWrapper(event.wait)
850
Ezio Melottic2077b02011-03-16 12:34:31 +0200851 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000852 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000853 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000854
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000855 # Removed, threading.Event.wait() will return the value of the __flag
856 # instead of None. API Shear with the semaphore backed mp.Event
857 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000859 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000860 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
861
862 event.set()
863
864 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000865 self.assertEqual(event.is_set(), True)
866 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000867 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000868 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000869 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
870 # self.assertEqual(event.is_set(), True)
871
872 event.clear()
873
874 #self.assertEqual(event.is_set(), False)
875
Jesus Cea6f6016b2011-09-09 20:26:57 +0200876 p = self.Process(target=self._test_event, args=(event,))
877 p.daemon = True
878 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000879 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000880
881#
882#
883#
884
885class _TestValue(BaseTestCase):
886
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000887 ALLOWED_TYPES = ('processes',)
888
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 codes_values = [
890 ('i', 4343, 24234),
891 ('d', 3.625, -4.25),
892 ('h', -232, 234),
893 ('c', latin('x'), latin('y'))
894 ]
895
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000896 def setUp(self):
897 if not HAS_SHAREDCTYPES:
898 self.skipTest("requires multiprocessing.sharedctypes")
899
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000900 @classmethod
901 def _test(cls, values):
902 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000903 sv.value = cv[2]
904
905
906 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 if raw:
908 values = [self.RawValue(code, value)
909 for code, value, _ in self.codes_values]
910 else:
911 values = [self.Value(code, value)
912 for code, value, _ in self.codes_values]
913
914 for sv, cv in zip(values, self.codes_values):
915 self.assertEqual(sv.value, cv[1])
916
917 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200918 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000919 proc.start()
920 proc.join()
921
922 for sv, cv in zip(values, self.codes_values):
923 self.assertEqual(sv.value, cv[2])
924
925 def test_rawvalue(self):
926 self.test_value(raw=True)
927
928 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 val1 = self.Value('i', 5)
930 lock1 = val1.get_lock()
931 obj1 = val1.get_obj()
932
933 val2 = self.Value('i', 5, lock=None)
934 lock2 = val2.get_lock()
935 obj2 = val2.get_obj()
936
937 lock = self.Lock()
938 val3 = self.Value('i', 5, lock=lock)
939 lock3 = val3.get_lock()
940 obj3 = val3.get_obj()
941 self.assertEqual(lock, lock3)
942
Jesse Noller6ab22152009-01-18 02:45:38 +0000943 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 self.assertFalse(hasattr(arr4, 'get_lock'))
945 self.assertFalse(hasattr(arr4, 'get_obj'))
946
Jesse Noller6ab22152009-01-18 02:45:38 +0000947 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
948
949 arr5 = self.RawValue('i', 5)
950 self.assertFalse(hasattr(arr5, 'get_lock'))
951 self.assertFalse(hasattr(arr5, 'get_obj'))
952
Benjamin Petersondfd79492008-06-13 19:13:39 +0000953
954class _TestArray(BaseTestCase):
955
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000956 ALLOWED_TYPES = ('processes',)
957
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000958 @classmethod
959 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000960 for i in range(1, len(seq)):
961 seq[i] += seq[i-1]
962
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000963 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000964 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000965 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
966 if raw:
967 arr = self.RawArray('i', seq)
968 else:
969 arr = self.Array('i', seq)
970
971 self.assertEqual(len(arr), len(seq))
972 self.assertEqual(arr[3], seq[3])
973 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
974
975 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
976
977 self.assertEqual(list(arr[:]), seq)
978
979 self.f(seq)
980
981 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200982 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 p.start()
984 p.join()
985
986 self.assertEqual(list(arr[:]), seq)
987
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000988 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000989 def test_array_from_size(self):
990 size = 10
991 # Test for zeroing (see issue #11675).
992 # The repetition below strengthens the test by increasing the chances
993 # of previously allocated non-zero memory being used for the new array
994 # on the 2nd and 3rd loops.
995 for _ in range(3):
996 arr = self.Array('i', size)
997 self.assertEqual(len(arr), size)
998 self.assertEqual(list(arr), [0] * size)
999 arr[:] = range(10)
1000 self.assertEqual(list(arr), range(10))
1001 del arr
1002
1003 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001004 def test_rawarray(self):
1005 self.test_array(raw=True)
1006
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001007 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001008 def test_array_accepts_long(self):
1009 arr = self.Array('i', 10L)
1010 self.assertEqual(len(arr), 10)
1011 raw_arr = self.RawArray('i', 10L)
1012 self.assertEqual(len(raw_arr), 10)
1013
1014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001015 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001016 arr1 = self.Array('i', range(10))
1017 lock1 = arr1.get_lock()
1018 obj1 = arr1.get_obj()
1019
1020 arr2 = self.Array('i', range(10), lock=None)
1021 lock2 = arr2.get_lock()
1022 obj2 = arr2.get_obj()
1023
1024 lock = self.Lock()
1025 arr3 = self.Array('i', range(10), lock=lock)
1026 lock3 = arr3.get_lock()
1027 obj3 = arr3.get_obj()
1028 self.assertEqual(lock, lock3)
1029
Jesse Noller6ab22152009-01-18 02:45:38 +00001030 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001031 self.assertFalse(hasattr(arr4, 'get_lock'))
1032 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001033 self.assertRaises(AttributeError,
1034 self.Array, 'i', range(10), lock='notalock')
1035
1036 arr5 = self.RawArray('i', range(10))
1037 self.assertFalse(hasattr(arr5, 'get_lock'))
1038 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001039
1040#
1041#
1042#
1043
1044class _TestContainers(BaseTestCase):
1045
1046 ALLOWED_TYPES = ('manager',)
1047
1048 def test_list(self):
1049 a = self.list(range(10))
1050 self.assertEqual(a[:], range(10))
1051
1052 b = self.list()
1053 self.assertEqual(b[:], [])
1054
1055 b.extend(range(5))
1056 self.assertEqual(b[:], range(5))
1057
1058 self.assertEqual(b[2], 2)
1059 self.assertEqual(b[2:10], [2,3,4])
1060
1061 b *= 2
1062 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1063
1064 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1065
1066 self.assertEqual(a[:], range(10))
1067
1068 d = [a, b]
1069 e = self.list(d)
1070 self.assertEqual(
1071 e[:],
1072 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1073 )
1074
1075 f = self.list([a])
1076 a.append('hello')
1077 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1078
1079 def test_dict(self):
1080 d = self.dict()
1081 indices = range(65, 70)
1082 for i in indices:
1083 d[i] = chr(i)
1084 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1085 self.assertEqual(sorted(d.keys()), indices)
1086 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1087 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1088
1089 def test_namespace(self):
1090 n = self.Namespace()
1091 n.name = 'Bob'
1092 n.job = 'Builder'
1093 n._hidden = 'hidden'
1094 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1095 del n.job
1096 self.assertEqual(str(n), "Namespace(name='Bob')")
1097 self.assertTrue(hasattr(n, 'name'))
1098 self.assertTrue(not hasattr(n, 'job'))
1099
1100#
1101#
1102#
1103
1104def sqr(x, wait=0.0):
1105 time.sleep(wait)
1106 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001107class _TestPool(BaseTestCase):
1108
1109 def test_apply(self):
1110 papply = self.pool.apply
1111 self.assertEqual(papply(sqr, (5,)), sqr(5))
1112 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1113
1114 def test_map(self):
1115 pmap = self.pool.map
1116 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1117 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1118 map(sqr, range(100)))
1119
Jesse Noller7530e472009-07-16 14:23:04 +00001120 def test_map_chunksize(self):
1121 try:
1122 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1123 except multiprocessing.TimeoutError:
1124 self.fail("pool.map_async with chunksize stalled on null list")
1125
Benjamin Petersondfd79492008-06-13 19:13:39 +00001126 def test_async(self):
1127 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1128 get = TimingWrapper(res.get)
1129 self.assertEqual(get(), 49)
1130 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1131
1132 def test_async_timeout(self):
1133 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1134 get = TimingWrapper(res.get)
1135 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1136 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1137
1138 def test_imap(self):
1139 it = self.pool.imap(sqr, range(10))
1140 self.assertEqual(list(it), map(sqr, range(10)))
1141
1142 it = self.pool.imap(sqr, range(10))
1143 for i in range(10):
1144 self.assertEqual(it.next(), i*i)
1145 self.assertRaises(StopIteration, it.next)
1146
1147 it = self.pool.imap(sqr, range(1000), chunksize=100)
1148 for i in range(1000):
1149 self.assertEqual(it.next(), i*i)
1150 self.assertRaises(StopIteration, it.next)
1151
1152 def test_imap_unordered(self):
1153 it = self.pool.imap_unordered(sqr, range(1000))
1154 self.assertEqual(sorted(it), map(sqr, range(1000)))
1155
1156 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1157 self.assertEqual(sorted(it), map(sqr, range(1000)))
1158
1159 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001160 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1161 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1162
Benjamin Petersondfd79492008-06-13 19:13:39 +00001163 p = multiprocessing.Pool(3)
1164 self.assertEqual(3, len(p._pool))
1165 p.close()
1166 p.join()
1167
1168 def test_terminate(self):
1169 if self.TYPE == 'manager':
1170 # On Unix a forked process increfs each shared object to
1171 # which its parent process held a reference. If the
1172 # forked process gets terminated then there is likely to
1173 # be a reference leak. So to prevent
1174 # _TestZZZNumberOfObjects from failing we skip this test
1175 # when using a manager.
1176 return
1177
1178 result = self.pool.map_async(
1179 time.sleep, [0.1 for i in range(10000)], chunksize=1
1180 )
1181 self.pool.terminate()
1182 join = TimingWrapper(self.pool.join)
1183 join()
1184 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001185
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001186 def test_empty_iterable(self):
1187 # See Issue 12157
1188 p = self.Pool(1)
1189
1190 self.assertEqual(p.map(sqr, []), [])
1191 self.assertEqual(list(p.imap(sqr, [])), [])
1192 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1193 self.assertEqual(p.map_async(sqr, []).get(), [])
1194
1195 p.close()
1196 p.join()
1197
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001198def unpickleable_result():
1199 return lambda: 42
1200
1201class _TestPoolWorkerErrors(BaseTestCase):
1202 ALLOWED_TYPES = ('processes', )
1203
1204 def test_unpickleable_result(self):
1205 from multiprocessing.pool import MaybeEncodingError
1206 p = multiprocessing.Pool(2)
1207
1208 # Make sure we don't lose pool processes because of encoding errors.
1209 for iteration in range(20):
1210 res = p.apply_async(unpickleable_result)
1211 self.assertRaises(MaybeEncodingError, res.get)
1212
1213 p.close()
1214 p.join()
1215
Jesse Noller654ade32010-01-27 03:05:57 +00001216class _TestPoolWorkerLifetime(BaseTestCase):
1217
1218 ALLOWED_TYPES = ('processes', )
1219 def test_pool_worker_lifetime(self):
1220 p = multiprocessing.Pool(3, maxtasksperchild=10)
1221 self.assertEqual(3, len(p._pool))
1222 origworkerpids = [w.pid for w in p._pool]
1223 # Run many tasks so each worker gets replaced (hopefully)
1224 results = []
1225 for i in range(100):
1226 results.append(p.apply_async(sqr, (i, )))
1227 # Fetch the results and verify we got the right answers,
1228 # also ensuring all the tasks have completed.
1229 for (j, res) in enumerate(results):
1230 self.assertEqual(res.get(), sqr(j))
1231 # Refill the pool
1232 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001233 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001234 # (countdown * DELTA = 5 seconds max startup process time)
1235 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001236 while countdown and not all(w.is_alive() for w in p._pool):
1237 countdown -= 1
1238 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001239 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001240 # All pids should be assigned. See issue #7805.
1241 self.assertNotIn(None, origworkerpids)
1242 self.assertNotIn(None, finalworkerpids)
1243 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001244 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1245 p.close()
1246 p.join()
1247
Charles-François Natali46f990e2011-10-24 18:43:51 +02001248 def test_pool_worker_lifetime_early_close(self):
1249 # Issue #10332: closing a pool whose workers have limited lifetimes
1250 # before all the tasks completed would make join() hang.
1251 p = multiprocessing.Pool(3, maxtasksperchild=1)
1252 results = []
1253 for i in range(6):
1254 results.append(p.apply_async(sqr, (i, 0.3)))
1255 p.close()
1256 p.join()
1257 # check the results
1258 for (j, res) in enumerate(results):
1259 self.assertEqual(res.get(), sqr(j))
1260
1261
Benjamin Petersondfd79492008-06-13 19:13:39 +00001262#
1263# Test that manager has expected number of shared objects left
1264#
1265
1266class _TestZZZNumberOfObjects(BaseTestCase):
1267 # Because test cases are sorted alphabetically, this one will get
1268 # run after all the other tests for the manager. It tests that
1269 # there have been no "reference leaks" for the manager's shared
1270 # objects. Note the comment in _TestPool.test_terminate().
1271 ALLOWED_TYPES = ('manager',)
1272
1273 def test_number_of_objects(self):
1274 EXPECTED_NUMBER = 1 # the pool object is still alive
1275 multiprocessing.active_children() # discard dead process objs
1276 gc.collect() # do garbage collection
1277 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001278 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001279 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001280 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001281 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001282
1283 self.assertEqual(refs, EXPECTED_NUMBER)
1284
1285#
1286# Test of creating a customized manager class
1287#
1288
1289from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1290
1291class FooBar(object):
1292 def f(self):
1293 return 'f()'
1294 def g(self):
1295 raise ValueError
1296 def _h(self):
1297 return '_h()'
1298
1299def baz():
1300 for i in xrange(10):
1301 yield i*i
1302
1303class IteratorProxy(BaseProxy):
1304 _exposed_ = ('next', '__next__')
1305 def __iter__(self):
1306 return self
1307 def next(self):
1308 return self._callmethod('next')
1309 def __next__(self):
1310 return self._callmethod('__next__')
1311
1312class MyManager(BaseManager):
1313 pass
1314
1315MyManager.register('Foo', callable=FooBar)
1316MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1317MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1318
1319
1320class _TestMyManager(BaseTestCase):
1321
1322 ALLOWED_TYPES = ('manager',)
1323
1324 def test_mymanager(self):
1325 manager = MyManager()
1326 manager.start()
1327
1328 foo = manager.Foo()
1329 bar = manager.Bar()
1330 baz = manager.baz()
1331
1332 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1333 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1334
1335 self.assertEqual(foo_methods, ['f', 'g'])
1336 self.assertEqual(bar_methods, ['f', '_h'])
1337
1338 self.assertEqual(foo.f(), 'f()')
1339 self.assertRaises(ValueError, foo.g)
1340 self.assertEqual(foo._callmethod('f'), 'f()')
1341 self.assertRaises(RemoteError, foo._callmethod, '_h')
1342
1343 self.assertEqual(bar.f(), 'f()')
1344 self.assertEqual(bar._h(), '_h()')
1345 self.assertEqual(bar._callmethod('f'), 'f()')
1346 self.assertEqual(bar._callmethod('_h'), '_h()')
1347
1348 self.assertEqual(list(baz), [i*i for i in range(10)])
1349
1350 manager.shutdown()
1351
1352#
1353# Test of connecting to a remote server and using xmlrpclib for serialization
1354#
1355
1356_queue = Queue.Queue()
1357def get_queue():
1358 return _queue
1359
1360class QueueManager(BaseManager):
1361 '''manager class used by server process'''
1362QueueManager.register('get_queue', callable=get_queue)
1363
1364class QueueManager2(BaseManager):
1365 '''manager class which specifies the same interface as QueueManager'''
1366QueueManager2.register('get_queue')
1367
1368
1369SERIALIZER = 'xmlrpclib'
1370
1371class _TestRemoteManager(BaseTestCase):
1372
1373 ALLOWED_TYPES = ('manager',)
1374
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001375 @classmethod
1376 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001377 manager = QueueManager2(
1378 address=address, authkey=authkey, serializer=SERIALIZER
1379 )
1380 manager.connect()
1381 queue = manager.get_queue()
1382 queue.put(('hello world', None, True, 2.25))
1383
1384 def test_remote(self):
1385 authkey = os.urandom(32)
1386
1387 manager = QueueManager(
1388 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1389 )
1390 manager.start()
1391
1392 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001393 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001394 p.start()
1395
1396 manager2 = QueueManager2(
1397 address=manager.address, authkey=authkey, serializer=SERIALIZER
1398 )
1399 manager2.connect()
1400 queue = manager2.get_queue()
1401
1402 # Note that xmlrpclib will deserialize object as a list not a tuple
1403 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1404
1405 # Because we are using xmlrpclib for serialization instead of
1406 # pickle this will cause a serialization error.
1407 self.assertRaises(Exception, queue.put, time.sleep)
1408
1409 # Make queue finalizer run before the server is stopped
1410 del queue
1411 manager.shutdown()
1412
Jesse Noller459a6482009-03-30 15:50:42 +00001413class _TestManagerRestart(BaseTestCase):
1414
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001415 @classmethod
1416 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001417 manager = QueueManager(
1418 address=address, authkey=authkey, serializer=SERIALIZER)
1419 manager.connect()
1420 queue = manager.get_queue()
1421 queue.put('hello world')
1422
1423 def test_rapid_restart(self):
1424 authkey = os.urandom(32)
1425 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001426 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001427 srvr = manager.get_server()
1428 addr = srvr.address
1429 # Close the connection.Listener socket which gets opened as a part
1430 # of manager.get_server(). It's not needed for the test.
1431 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001432 manager.start()
1433
1434 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001435 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001436 p.start()
1437 queue = manager.get_queue()
1438 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001439 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001440 manager.shutdown()
1441 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001442 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001443 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001444 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001445
Benjamin Petersondfd79492008-06-13 19:13:39 +00001446#
1447#
1448#
1449
1450SENTINEL = latin('')
1451
1452class _TestConnection(BaseTestCase):
1453
1454 ALLOWED_TYPES = ('processes', 'threads')
1455
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001456 @classmethod
1457 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001458 for msg in iter(conn.recv_bytes, SENTINEL):
1459 conn.send_bytes(msg)
1460 conn.close()
1461
1462 def test_connection(self):
1463 conn, child_conn = self.Pipe()
1464
1465 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001466 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001467 p.start()
1468
1469 seq = [1, 2.25, None]
1470 msg = latin('hello world')
1471 longmsg = msg * 10
1472 arr = array.array('i', range(4))
1473
1474 if self.TYPE == 'processes':
1475 self.assertEqual(type(conn.fileno()), int)
1476
1477 self.assertEqual(conn.send(seq), None)
1478 self.assertEqual(conn.recv(), seq)
1479
1480 self.assertEqual(conn.send_bytes(msg), None)
1481 self.assertEqual(conn.recv_bytes(), msg)
1482
1483 if self.TYPE == 'processes':
1484 buffer = array.array('i', [0]*10)
1485 expected = list(arr) + [0] * (10 - len(arr))
1486 self.assertEqual(conn.send_bytes(arr), None)
1487 self.assertEqual(conn.recv_bytes_into(buffer),
1488 len(arr) * buffer.itemsize)
1489 self.assertEqual(list(buffer), expected)
1490
1491 buffer = array.array('i', [0]*10)
1492 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1493 self.assertEqual(conn.send_bytes(arr), None)
1494 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1495 len(arr) * buffer.itemsize)
1496 self.assertEqual(list(buffer), expected)
1497
1498 buffer = bytearray(latin(' ' * 40))
1499 self.assertEqual(conn.send_bytes(longmsg), None)
1500 try:
1501 res = conn.recv_bytes_into(buffer)
1502 except multiprocessing.BufferTooShort, e:
1503 self.assertEqual(e.args, (longmsg,))
1504 else:
1505 self.fail('expected BufferTooShort, got %s' % res)
1506
1507 poll = TimingWrapper(conn.poll)
1508
1509 self.assertEqual(poll(), False)
1510 self.assertTimingAlmostEqual(poll.elapsed, 0)
1511
1512 self.assertEqual(poll(TIMEOUT1), False)
1513 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1514
1515 conn.send(None)
1516
1517 self.assertEqual(poll(TIMEOUT1), True)
1518 self.assertTimingAlmostEqual(poll.elapsed, 0)
1519
1520 self.assertEqual(conn.recv(), None)
1521
1522 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1523 conn.send_bytes(really_big_msg)
1524 self.assertEqual(conn.recv_bytes(), really_big_msg)
1525
1526 conn.send_bytes(SENTINEL) # tell child to quit
1527 child_conn.close()
1528
1529 if self.TYPE == 'processes':
1530 self.assertEqual(conn.readable, True)
1531 self.assertEqual(conn.writable, True)
1532 self.assertRaises(EOFError, conn.recv)
1533 self.assertRaises(EOFError, conn.recv_bytes)
1534
1535 p.join()
1536
1537 def test_duplex_false(self):
1538 reader, writer = self.Pipe(duplex=False)
1539 self.assertEqual(writer.send(1), None)
1540 self.assertEqual(reader.recv(), 1)
1541 if self.TYPE == 'processes':
1542 self.assertEqual(reader.readable, True)
1543 self.assertEqual(reader.writable, False)
1544 self.assertEqual(writer.readable, False)
1545 self.assertEqual(writer.writable, True)
1546 self.assertRaises(IOError, reader.send, 2)
1547 self.assertRaises(IOError, writer.recv)
1548 self.assertRaises(IOError, writer.poll)
1549
1550 def test_spawn_close(self):
1551 # We test that a pipe connection can be closed by parent
1552 # process immediately after child is spawned. On Windows this
1553 # would have sometimes failed on old versions because
1554 # child_conn would be closed before the child got a chance to
1555 # duplicate it.
1556 conn, child_conn = self.Pipe()
1557
1558 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001559 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001560 p.start()
1561 child_conn.close() # this might complete before child initializes
1562
1563 msg = latin('hello')
1564 conn.send_bytes(msg)
1565 self.assertEqual(conn.recv_bytes(), msg)
1566
1567 conn.send_bytes(SENTINEL)
1568 conn.close()
1569 p.join()
1570
1571 def test_sendbytes(self):
1572 if self.TYPE != 'processes':
1573 return
1574
1575 msg = latin('abcdefghijklmnopqrstuvwxyz')
1576 a, b = self.Pipe()
1577
1578 a.send_bytes(msg)
1579 self.assertEqual(b.recv_bytes(), msg)
1580
1581 a.send_bytes(msg, 5)
1582 self.assertEqual(b.recv_bytes(), msg[5:])
1583
1584 a.send_bytes(msg, 7, 8)
1585 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1586
1587 a.send_bytes(msg, 26)
1588 self.assertEqual(b.recv_bytes(), latin(''))
1589
1590 a.send_bytes(msg, 26, 0)
1591 self.assertEqual(b.recv_bytes(), latin(''))
1592
1593 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1594
1595 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1596
1597 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1598
1599 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1600
1601 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1602
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001603 @classmethod
1604 def _is_fd_assigned(cls, fd):
1605 try:
1606 os.fstat(fd)
1607 except OSError as e:
1608 if e.errno == errno.EBADF:
1609 return False
1610 raise
1611 else:
1612 return True
1613
1614 @classmethod
1615 def _writefd(cls, conn, data, create_dummy_fds=False):
1616 if create_dummy_fds:
1617 for i in range(0, 256):
1618 if not cls._is_fd_assigned(i):
1619 os.dup2(conn.fileno(), i)
1620 fd = reduction.recv_handle(conn)
1621 if msvcrt:
1622 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1623 os.write(fd, data)
1624 os.close(fd)
1625
Charles-François Natalif8413b22011-09-21 18:44:49 +02001626 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001627 def test_fd_transfer(self):
1628 if self.TYPE != 'processes':
1629 self.skipTest("only makes sense with processes")
1630 conn, child_conn = self.Pipe(duplex=True)
1631
1632 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001633 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001634 p.start()
1635 with open(test_support.TESTFN, "wb") as f:
1636 fd = f.fileno()
1637 if msvcrt:
1638 fd = msvcrt.get_osfhandle(fd)
1639 reduction.send_handle(conn, fd, p.pid)
1640 p.join()
1641 with open(test_support.TESTFN, "rb") as f:
1642 self.assertEqual(f.read(), b"foo")
1643
Charles-François Natalif8413b22011-09-21 18:44:49 +02001644 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001645 @unittest.skipIf(sys.platform == "win32",
1646 "test semantics don't make sense on Windows")
1647 @unittest.skipIf(MAXFD <= 256,
1648 "largest assignable fd number is too small")
1649 @unittest.skipUnless(hasattr(os, "dup2"),
1650 "test needs os.dup2()")
1651 def test_large_fd_transfer(self):
1652 # With fd > 256 (issue #11657)
1653 if self.TYPE != 'processes':
1654 self.skipTest("only makes sense with processes")
1655 conn, child_conn = self.Pipe(duplex=True)
1656
1657 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001658 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001659 p.start()
1660 with open(test_support.TESTFN, "wb") as f:
1661 fd = f.fileno()
1662 for newfd in range(256, MAXFD):
1663 if not self._is_fd_assigned(newfd):
1664 break
1665 else:
1666 self.fail("could not find an unassigned large file descriptor")
1667 os.dup2(fd, newfd)
1668 try:
1669 reduction.send_handle(conn, newfd, p.pid)
1670 finally:
1671 os.close(newfd)
1672 p.join()
1673 with open(test_support.TESTFN, "rb") as f:
1674 self.assertEqual(f.read(), b"bar")
1675
Jesus Ceac23484b2011-09-21 03:47:39 +02001676 @classmethod
1677 def _send_data_without_fd(self, conn):
1678 os.write(conn.fileno(), b"\0")
1679
Charles-François Natalif8413b22011-09-21 18:44:49 +02001680 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001681 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1682 def test_missing_fd_transfer(self):
1683 # Check that exception is raised when received data is not
1684 # accompanied by a file descriptor in ancillary data.
1685 if self.TYPE != 'processes':
1686 self.skipTest("only makes sense with processes")
1687 conn, child_conn = self.Pipe(duplex=True)
1688
1689 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1690 p.daemon = True
1691 p.start()
1692 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1693 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001694
Benjamin Petersondfd79492008-06-13 19:13:39 +00001695class _TestListenerClient(BaseTestCase):
1696
1697 ALLOWED_TYPES = ('processes', 'threads')
1698
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001699 @classmethod
1700 def _test(cls, address):
1701 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001702 conn.send('hello')
1703 conn.close()
1704
1705 def test_listener_client(self):
1706 for family in self.connection.families:
1707 l = self.connection.Listener(family=family)
1708 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001709 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001710 p.start()
1711 conn = l.accept()
1712 self.assertEqual(conn.recv(), 'hello')
1713 p.join()
1714 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001715
1716 def test_issue14725(self):
1717 l = self.connection.Listener()
1718 p = self.Process(target=self._test, args=(l.address,))
1719 p.daemon = True
1720 p.start()
1721 time.sleep(1)
1722 # On Windows the client process should by now have connected,
1723 # written data and closed the pipe handle by now. This causes
1724 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1725 # 14725.
1726 conn = l.accept()
1727 self.assertEqual(conn.recv(), 'hello')
1728 conn.close()
1729 p.join()
1730 l.close()
1731
Benjamin Petersondfd79492008-06-13 19:13:39 +00001732#
1733# Test of sending connection and socket objects between processes
1734#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001735"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001736class _TestPicklingConnections(BaseTestCase):
1737
1738 ALLOWED_TYPES = ('processes',)
1739
1740 def _listener(self, conn, families):
1741 for fam in families:
1742 l = self.connection.Listener(family=fam)
1743 conn.send(l.address)
1744 new_conn = l.accept()
1745 conn.send(new_conn)
1746
1747 if self.TYPE == 'processes':
1748 l = socket.socket()
1749 l.bind(('localhost', 0))
1750 conn.send(l.getsockname())
1751 l.listen(1)
1752 new_conn, addr = l.accept()
1753 conn.send(new_conn)
1754
1755 conn.recv()
1756
1757 def _remote(self, conn):
1758 for (address, msg) in iter(conn.recv, None):
1759 client = self.connection.Client(address)
1760 client.send(msg.upper())
1761 client.close()
1762
1763 if self.TYPE == 'processes':
1764 address, msg = conn.recv()
1765 client = socket.socket()
1766 client.connect(address)
1767 client.sendall(msg.upper())
1768 client.close()
1769
1770 conn.close()
1771
1772 def test_pickling(self):
1773 try:
1774 multiprocessing.allow_connection_pickling()
1775 except ImportError:
1776 return
1777
1778 families = self.connection.families
1779
1780 lconn, lconn0 = self.Pipe()
1781 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001782 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001783 lp.start()
1784 lconn0.close()
1785
1786 rconn, rconn0 = self.Pipe()
1787 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001788 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001789 rp.start()
1790 rconn0.close()
1791
1792 for fam in families:
1793 msg = ('This connection uses family %s' % fam).encode('ascii')
1794 address = lconn.recv()
1795 rconn.send((address, msg))
1796 new_conn = lconn.recv()
1797 self.assertEqual(new_conn.recv(), msg.upper())
1798
1799 rconn.send(None)
1800
1801 if self.TYPE == 'processes':
1802 msg = latin('This connection uses a normal socket')
1803 address = lconn.recv()
1804 rconn.send((address, msg))
1805 if hasattr(socket, 'fromfd'):
1806 new_conn = lconn.recv()
1807 self.assertEqual(new_conn.recv(100), msg.upper())
1808 else:
1809 # XXX On Windows with Py2.6 need to backport fromfd()
1810 discard = lconn.recv_bytes()
1811
1812 lconn.send(None)
1813
1814 rconn.close()
1815 lconn.close()
1816
1817 lp.join()
1818 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001819"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001820#
1821#
1822#
1823
1824class _TestHeap(BaseTestCase):
1825
1826 ALLOWED_TYPES = ('processes',)
1827
1828 def test_heap(self):
1829 iterations = 5000
1830 maxblocks = 50
1831 blocks = []
1832
1833 # create and destroy lots of blocks of different sizes
1834 for i in xrange(iterations):
1835 size = int(random.lognormvariate(0, 1) * 1000)
1836 b = multiprocessing.heap.BufferWrapper(size)
1837 blocks.append(b)
1838 if len(blocks) > maxblocks:
1839 i = random.randrange(maxblocks)
1840 del blocks[i]
1841
1842 # get the heap object
1843 heap = multiprocessing.heap.BufferWrapper._heap
1844
1845 # verify the state of the heap
1846 all = []
1847 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001848 heap._lock.acquire()
1849 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001850 for L in heap._len_to_seq.values():
1851 for arena, start, stop in L:
1852 all.append((heap._arenas.index(arena), start, stop,
1853 stop-start, 'free'))
1854 for arena, start, stop in heap._allocated_blocks:
1855 all.append((heap._arenas.index(arena), start, stop,
1856 stop-start, 'occupied'))
1857 occupied += (stop-start)
1858
1859 all.sort()
1860
1861 for i in range(len(all)-1):
1862 (arena, start, stop) = all[i][:3]
1863 (narena, nstart, nstop) = all[i+1][:3]
1864 self.assertTrue((arena != narena and nstart == 0) or
1865 (stop == nstart))
1866
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001867 def test_free_from_gc(self):
1868 # Check that freeing of blocks by the garbage collector doesn't deadlock
1869 # (issue #12352).
1870 # Make sure the GC is enabled, and set lower collection thresholds to
1871 # make collections more frequent (and increase the probability of
1872 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001873 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001874 gc.enable()
1875 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001876 thresholds = gc.get_threshold()
1877 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001878 gc.set_threshold(10)
1879
1880 # perform numerous block allocations, with cyclic references to make
1881 # sure objects are collected asynchronously by the gc
1882 for i in range(5000):
1883 a = multiprocessing.heap.BufferWrapper(1)
1884 b = multiprocessing.heap.BufferWrapper(1)
1885 # circular references
1886 a.buddy = b
1887 b.buddy = a
1888
Benjamin Petersondfd79492008-06-13 19:13:39 +00001889#
1890#
1891#
1892
Benjamin Petersondfd79492008-06-13 19:13:39 +00001893class _Foo(Structure):
1894 _fields_ = [
1895 ('x', c_int),
1896 ('y', c_double)
1897 ]
1898
1899class _TestSharedCTypes(BaseTestCase):
1900
1901 ALLOWED_TYPES = ('processes',)
1902
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001903 def setUp(self):
1904 if not HAS_SHAREDCTYPES:
1905 self.skipTest("requires multiprocessing.sharedctypes")
1906
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001907 @classmethod
1908 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001909 x.value *= 2
1910 y.value *= 2
1911 foo.x *= 2
1912 foo.y *= 2
1913 string.value *= 2
1914 for i in range(len(arr)):
1915 arr[i] *= 2
1916
1917 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001918 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001919 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001920 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001921 arr = self.Array('d', range(10), lock=lock)
1922 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001923 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001924
1925 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001926 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001927 p.start()
1928 p.join()
1929
1930 self.assertEqual(x.value, 14)
1931 self.assertAlmostEqual(y.value, 2.0/3.0)
1932 self.assertEqual(foo.x, 6)
1933 self.assertAlmostEqual(foo.y, 4.0)
1934 for i in range(10):
1935 self.assertAlmostEqual(arr[i], i*2)
1936 self.assertEqual(string.value, latin('hellohello'))
1937
1938 def test_synchronize(self):
1939 self.test_sharedctypes(lock=True)
1940
1941 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001942 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001943 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001944 foo.x = 0
1945 foo.y = 0
1946 self.assertEqual(bar.x, 2)
1947 self.assertAlmostEqual(bar.y, 5.0)
1948
1949#
1950#
1951#
1952
1953class _TestFinalize(BaseTestCase):
1954
1955 ALLOWED_TYPES = ('processes',)
1956
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001957 @classmethod
1958 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001959 class Foo(object):
1960 pass
1961
1962 a = Foo()
1963 util.Finalize(a, conn.send, args=('a',))
1964 del a # triggers callback for a
1965
1966 b = Foo()
1967 close_b = util.Finalize(b, conn.send, args=('b',))
1968 close_b() # triggers callback for b
1969 close_b() # does nothing because callback has already been called
1970 del b # does nothing because callback has already been called
1971
1972 c = Foo()
1973 util.Finalize(c, conn.send, args=('c',))
1974
1975 d10 = Foo()
1976 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1977
1978 d01 = Foo()
1979 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1980 d02 = Foo()
1981 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1982 d03 = Foo()
1983 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1984
1985 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1986
1987 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1988
Ezio Melottic2077b02011-03-16 12:34:31 +02001989 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001990 # garbage collecting locals
1991 util._exit_function()
1992 conn.close()
1993 os._exit(0)
1994
1995 def test_finalize(self):
1996 conn, child_conn = self.Pipe()
1997
1998 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001999 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002000 p.start()
2001 p.join()
2002
2003 result = [obj for obj in iter(conn.recv, 'STOP')]
2004 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2005
2006#
2007# Test that from ... import * works for each module
2008#
2009
2010class _TestImportStar(BaseTestCase):
2011
2012 ALLOWED_TYPES = ('processes',)
2013
2014 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002015 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002016 'multiprocessing', 'multiprocessing.connection',
2017 'multiprocessing.heap', 'multiprocessing.managers',
2018 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002019 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002020 ]
2021
Charles-François Natalif8413b22011-09-21 18:44:49 +02002022 if HAS_REDUCTION:
2023 modules.append('multiprocessing.reduction')
2024
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002025 if c_int is not None:
2026 # This module requires _ctypes
2027 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002028
2029 for name in modules:
2030 __import__(name)
2031 mod = sys.modules[name]
2032
2033 for attr in getattr(mod, '__all__', ()):
2034 self.assertTrue(
2035 hasattr(mod, attr),
2036 '%r does not have attribute %r' % (mod, attr)
2037 )
2038
2039#
2040# Quick test that logging works -- does not test logging output
2041#
2042
2043class _TestLogging(BaseTestCase):
2044
2045 ALLOWED_TYPES = ('processes',)
2046
2047 def test_enable_logging(self):
2048 logger = multiprocessing.get_logger()
2049 logger.setLevel(util.SUBWARNING)
2050 self.assertTrue(logger is not None)
2051 logger.debug('this will not be printed')
2052 logger.info('nor will this')
2053 logger.setLevel(LOG_LEVEL)
2054
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002055 @classmethod
2056 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002057 logger = multiprocessing.get_logger()
2058 conn.send(logger.getEffectiveLevel())
2059
2060 def test_level(self):
2061 LEVEL1 = 32
2062 LEVEL2 = 37
2063
2064 logger = multiprocessing.get_logger()
2065 root_logger = logging.getLogger()
2066 root_level = root_logger.level
2067
2068 reader, writer = multiprocessing.Pipe(duplex=False)
2069
2070 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002071 p = self.Process(target=self._test_level, args=(writer,))
2072 p.daemon = True
2073 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002074 self.assertEqual(LEVEL1, reader.recv())
2075
2076 logger.setLevel(logging.NOTSET)
2077 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002078 p = self.Process(target=self._test_level, args=(writer,))
2079 p.daemon = True
2080 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002081 self.assertEqual(LEVEL2, reader.recv())
2082
2083 root_logger.setLevel(root_level)
2084 logger.setLevel(level=LOG_LEVEL)
2085
Jesse Noller814d02d2009-11-21 14:38:23 +00002086
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002087# class _TestLoggingProcessName(BaseTestCase):
2088#
2089# def handle(self, record):
2090# assert record.processName == multiprocessing.current_process().name
2091# self.__handled = True
2092#
2093# def test_logging(self):
2094# handler = logging.Handler()
2095# handler.handle = self.handle
2096# self.__handled = False
2097# # Bypass getLogger() and side-effects
2098# logger = logging.getLoggerClass()(
2099# 'multiprocessing.test.TestLoggingProcessName')
2100# logger.addHandler(handler)
2101# logger.propagate = False
2102#
2103# logger.warn('foo')
2104# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002105
Benjamin Petersondfd79492008-06-13 19:13:39 +00002106#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002107# Test to verify handle verification, see issue 3321
2108#
2109
2110class TestInvalidHandle(unittest.TestCase):
2111
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002112 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002113 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002114 conn = _multiprocessing.Connection(44977608)
2115 self.assertRaises(IOError, conn.poll)
2116 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002117
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002118#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002119# Functions used to create test cases from the base ones in this module
2120#
2121
2122def get_attributes(Source, names):
2123 d = {}
2124 for name in names:
2125 obj = getattr(Source, name)
2126 if type(obj) == type(get_attributes):
2127 obj = staticmethod(obj)
2128 d[name] = obj
2129 return d
2130
2131def create_test_cases(Mixin, type):
2132 result = {}
2133 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002134 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002135
2136 for name in glob.keys():
2137 if name.startswith('_Test'):
2138 base = glob[name]
2139 if type in base.ALLOWED_TYPES:
2140 newname = 'With' + Type + name[1:]
2141 class Temp(base, unittest.TestCase, Mixin):
2142 pass
2143 result[newname] = Temp
2144 Temp.__name__ = newname
2145 Temp.__module__ = Mixin.__module__
2146 return result
2147
2148#
2149# Create test cases
2150#
2151
2152class ProcessesMixin(object):
2153 TYPE = 'processes'
2154 Process = multiprocessing.Process
2155 locals().update(get_attributes(multiprocessing, (
2156 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2157 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2158 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002159 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002160 )))
2161
2162testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2163globals().update(testcases_processes)
2164
2165
2166class ManagerMixin(object):
2167 TYPE = 'manager'
2168 Process = multiprocessing.Process
2169 manager = object.__new__(multiprocessing.managers.SyncManager)
2170 locals().update(get_attributes(manager, (
2171 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2172 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002173 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002174 )))
2175
2176testcases_manager = create_test_cases(ManagerMixin, type='manager')
2177globals().update(testcases_manager)
2178
2179
2180class ThreadsMixin(object):
2181 TYPE = 'threads'
2182 Process = multiprocessing.dummy.Process
2183 locals().update(get_attributes(multiprocessing.dummy, (
2184 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2185 'Condition', 'Event', 'Value', 'Array', 'current_process',
2186 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002187 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002188 )))
2189
2190testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2191globals().update(testcases_threads)
2192
Neal Norwitz0c519b32008-08-25 01:50:24 +00002193class OtherTest(unittest.TestCase):
2194 # TODO: add more tests for deliver/answer challenge.
2195 def test_deliver_challenge_auth_failure(self):
2196 class _FakeConnection(object):
2197 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002198 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002199 def send_bytes(self, data):
2200 pass
2201 self.assertRaises(multiprocessing.AuthenticationError,
2202 multiprocessing.connection.deliver_challenge,
2203 _FakeConnection(), b'abc')
2204
2205 def test_answer_challenge_auth_failure(self):
2206 class _FakeConnection(object):
2207 def __init__(self):
2208 self.count = 0
2209 def recv_bytes(self, size):
2210 self.count += 1
2211 if self.count == 1:
2212 return multiprocessing.connection.CHALLENGE
2213 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002214 return b'something bogus'
2215 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002216 def send_bytes(self, data):
2217 pass
2218 self.assertRaises(multiprocessing.AuthenticationError,
2219 multiprocessing.connection.answer_challenge,
2220 _FakeConnection(), b'abc')
2221
Jesse Noller7152f6d2009-04-02 05:17:26 +00002222#
2223# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2224#
2225
2226def initializer(ns):
2227 ns.test += 1
2228
2229class TestInitializers(unittest.TestCase):
2230 def setUp(self):
2231 self.mgr = multiprocessing.Manager()
2232 self.ns = self.mgr.Namespace()
2233 self.ns.test = 0
2234
2235 def tearDown(self):
2236 self.mgr.shutdown()
2237
2238 def test_manager_initializer(self):
2239 m = multiprocessing.managers.SyncManager()
2240 self.assertRaises(TypeError, m.start, 1)
2241 m.start(initializer, (self.ns,))
2242 self.assertEqual(self.ns.test, 1)
2243 m.shutdown()
2244
2245 def test_pool_initializer(self):
2246 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2247 p = multiprocessing.Pool(1, initializer, (self.ns,))
2248 p.close()
2249 p.join()
2250 self.assertEqual(self.ns.test, 1)
2251
Jesse Noller1b90efb2009-06-30 17:11:52 +00002252#
2253# Issue 5155, 5313, 5331: Test process in processes
2254# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2255#
2256
2257def _ThisSubProcess(q):
2258 try:
2259 item = q.get(block=False)
2260 except Queue.Empty:
2261 pass
2262
2263def _TestProcess(q):
2264 queue = multiprocessing.Queue()
2265 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002266 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002267 subProc.start()
2268 subProc.join()
2269
2270def _afunc(x):
2271 return x*x
2272
2273def pool_in_process():
2274 pool = multiprocessing.Pool(processes=4)
2275 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2276
2277class _file_like(object):
2278 def __init__(self, delegate):
2279 self._delegate = delegate
2280 self._pid = None
2281
2282 @property
2283 def cache(self):
2284 pid = os.getpid()
2285 # There are no race conditions since fork keeps only the running thread
2286 if pid != self._pid:
2287 self._pid = pid
2288 self._cache = []
2289 return self._cache
2290
2291 def write(self, data):
2292 self.cache.append(data)
2293
2294 def flush(self):
2295 self._delegate.write(''.join(self.cache))
2296 self._cache = []
2297
2298class TestStdinBadfiledescriptor(unittest.TestCase):
2299
2300 def test_queue_in_process(self):
2301 queue = multiprocessing.Queue()
2302 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2303 proc.start()
2304 proc.join()
2305
2306 def test_pool_in_process(self):
2307 p = multiprocessing.Process(target=pool_in_process)
2308 p.start()
2309 p.join()
2310
2311 def test_flushing(self):
2312 sio = StringIO()
2313 flike = _file_like(sio)
2314 flike.write('foo')
2315 proc = multiprocessing.Process(target=lambda: flike.flush())
2316 flike.flush()
2317 assert sio.getvalue() == 'foo'
2318
Richard Oudkerke4b99382012-07-27 14:05:46 +01002319#
2320# Test interaction with socket timeouts - see Issue #6056
2321#
2322
2323class TestTimeouts(unittest.TestCase):
2324 @classmethod
2325 def _test_timeout(cls, child, address):
2326 time.sleep(1)
2327 child.send(123)
2328 child.close()
2329 conn = multiprocessing.connection.Client(address)
2330 conn.send(456)
2331 conn.close()
2332
2333 def test_timeout(self):
2334 old_timeout = socket.getdefaulttimeout()
2335 try:
2336 socket.setdefaulttimeout(0.1)
2337 parent, child = multiprocessing.Pipe(duplex=True)
2338 l = multiprocessing.connection.Listener(family='AF_INET')
2339 p = multiprocessing.Process(target=self._test_timeout,
2340 args=(child, l.address))
2341 p.start()
2342 child.close()
2343 self.assertEqual(parent.recv(), 123)
2344 parent.close()
2345 conn = l.accept()
2346 self.assertEqual(conn.recv(), 456)
2347 conn.close()
2348 l.close()
2349 p.join(10)
2350 finally:
2351 socket.setdefaulttimeout(old_timeout)
2352
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002353#
2354# Test what happens with no "if __name__ == '__main__'"
2355#
2356
2357class TestNoForkBomb(unittest.TestCase):
2358 def test_noforkbomb(self):
2359 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2360 if WIN32:
2361 rc, out, err = test.script_helper.assert_python_failure(name)
2362 self.assertEqual('', out.decode('ascii'))
2363 self.assertIn('RuntimeError', err.decode('ascii'))
2364 else:
2365 rc, out, err = test.script_helper.assert_python_ok(name)
2366 self.assertEqual('123', out.decode('ascii').rstrip())
2367 self.assertEqual('', err.decode('ascii'))
2368
2369#
2370#
2371#
2372
Jesse Noller1b90efb2009-06-30 17:11:52 +00002373testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002374 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002375
Benjamin Petersondfd79492008-06-13 19:13:39 +00002376#
2377#
2378#
2379
2380def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002381 if sys.platform.startswith("linux"):
2382 try:
2383 lock = multiprocessing.RLock()
2384 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002385 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002386
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002387 check_enough_semaphores()
2388
Benjamin Petersondfd79492008-06-13 19:13:39 +00002389 if run is None:
2390 from test.test_support import run_unittest as run
2391
2392 util.get_temp_dir() # creates temp directory for use by all processes
2393
2394 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2395
Jesse Noller146b7ab2008-07-02 16:44:09 +00002396 ProcessesMixin.pool = multiprocessing.Pool(4)
2397 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2398 ManagerMixin.manager.__init__()
2399 ManagerMixin.manager.start()
2400 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002401
2402 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002403 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2404 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002405 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2406 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002407 )
2408
2409 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2410 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002411 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2412 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002413 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002414 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002415 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002416 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2417 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2418 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002419 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002420
Jesse Noller146b7ab2008-07-02 16:44:09 +00002421 ThreadsMixin.pool.terminate()
2422 ProcessesMixin.pool.terminate()
2423 ManagerMixin.pool.terminate()
2424 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002425
Jesse Noller146b7ab2008-07-02 16:44:09 +00002426 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002427
2428def main():
2429 test_main(unittest.TextTestRunner(verbosity=2).run)
2430
2431if __name__ == '__main__':
2432 main()