blob: f140aca63f179e5ede9b93369d85b576c25bff43 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010019import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000020from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000021from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000022_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020023# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000024# message: "No module named _multiprocessing". _multiprocessing is not compiled
25# without thread support.
26import threading
R. David Murray3db8a342009-03-30 23:05:48 +000027
Jesse Noller37040cd2008-09-30 00:15:45 +000028# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000029test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000030
Benjamin Petersondfd79492008-06-13 19:13:39 +000031import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000035import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000036
Charles-François Natalif8413b22011-09-21 18:44:49 +020037from multiprocessing import util
38
39try:
40 from multiprocessing import reduction
41 HAS_REDUCTION = True
42except ImportError:
43 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000044
Brian Curtina06e9b82010-10-07 02:27:41 +000045try:
46 from multiprocessing.sharedctypes import Value, copy
47 HAS_SHAREDCTYPES = True
48except ImportError:
49 HAS_SHAREDCTYPES = False
50
Antoine Pitroua1a8da82011-08-23 19:54:20 +020051try:
52 import msvcrt
53except ImportError:
54 msvcrt = None
55
Benjamin Petersondfd79492008-06-13 19:13:39 +000056#
57#
58#
59
Benjamin Petersone79edf52008-07-13 18:34:58 +000060latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000061
Benjamin Petersondfd79492008-06-13 19:13:39 +000062#
63# Constants
64#
65
66LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000067#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000068
69DELTA = 0.1
70CHECK_TIMINGS = False # making true makes tests take a lot longer
71 # and can sometimes cause some non-serious
72 # failures because some calls block a bit
73 # longer than expected
74if CHECK_TIMINGS:
75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
76else:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
78
79HAVE_GETVALUE = not getattr(_multiprocessing,
80 'HAVE_BROKEN_SEM_GETVALUE', False)
81
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000082WIN32 = (sys.platform == "win32")
83
Antoine Pitroua1a8da82011-08-23 19:54:20 +020084try:
85 MAXFD = os.sysconf("SC_OPEN_MAX")
86except:
87 MAXFD = 256
88
Benjamin Petersondfd79492008-06-13 19:13:39 +000089#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000090# Some tests require ctypes
91#
92
93try:
Nick Coghlan13623662010-04-10 14:24:36 +000094 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000095except ImportError:
96 Structure = object
97 c_int = c_double = None
98
Charles-François Natali6392d7f2011-11-22 18:35:18 +010099
100def check_enough_semaphores():
101 """Check that the system supports enough semaphores to run the test."""
102 # minimum number of semaphores available according to POSIX
103 nsems_min = 256
104 try:
105 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
106 except (AttributeError, ValueError):
107 # sysconf not available or setting not available
108 return
109 if nsems == -1 or nsems >= nsems_min:
110 return
111 raise unittest.SkipTest("The OS doesn't support enough semaphores "
112 "to run the test (required: %d)." % nsems_min)
113
114
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000115#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000116# Creates a wrapper for a function which records the time it takes to finish
117#
118
119class TimingWrapper(object):
120
121 def __init__(self, func):
122 self.func = func
123 self.elapsed = None
124
125 def __call__(self, *args, **kwds):
126 t = time.time()
127 try:
128 return self.func(*args, **kwds)
129 finally:
130 self.elapsed = time.time() - t
131
132#
133# Base class for test cases
134#
135
136class BaseTestCase(object):
137
138 ALLOWED_TYPES = ('processes', 'manager', 'threads')
139
140 def assertTimingAlmostEqual(self, a, b):
141 if CHECK_TIMINGS:
142 self.assertAlmostEqual(a, b, 1)
143
144 def assertReturnsIfImplemented(self, value, func, *args):
145 try:
146 res = func(*args)
147 except NotImplementedError:
148 pass
149 else:
150 return self.assertEqual(value, res)
151
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000152 # For the sanity of Windows users, rather than crashing or freezing in
153 # multiple ways.
154 def __reduce__(self, *args):
155 raise NotImplementedError("shouldn't try to pickle a test case")
156
157 __reduce_ex__ = __reduce__
158
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159#
160# Return the value of a semaphore
161#
162
163def get_value(self):
164 try:
165 return self.get_value()
166 except AttributeError:
167 try:
168 return self._Semaphore__value
169 except AttributeError:
170 try:
171 return self._value
172 except AttributeError:
173 raise NotImplementedError
174
175#
176# Testcases
177#
178
179class _TestProcess(BaseTestCase):
180
181 ALLOWED_TYPES = ('processes', 'threads')
182
183 def test_current(self):
184 if self.TYPE == 'threads':
185 return
186
187 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000188 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000189
190 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000191 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000192 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000193 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000194 self.assertEqual(current.ident, os.getpid())
195 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000196
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000197 @classmethod
198 def _test(cls, q, *args, **kwds):
199 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000200 q.put(args)
201 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000202 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000203 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000204 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205 q.put(current.pid)
206
207 def test_process(self):
208 q = self.Queue(1)
209 e = self.Event()
210 args = (q, 1, 2)
211 kwargs = {'hello':23, 'bye':2.54}
212 name = 'SomeProcess'
213 p = self.Process(
214 target=self._test, args=args, kwargs=kwargs, name=name
215 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000216 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000217 current = self.current_process()
218
219 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000220 self.assertEqual(p.authkey, current.authkey)
221 self.assertEqual(p.is_alive(), False)
222 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000223 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000224 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000225 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000226
227 p.start()
228
Ezio Melotti2623a372010-11-21 13:34:58 +0000229 self.assertEqual(p.exitcode, None)
230 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000231 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000232
Ezio Melotti2623a372010-11-21 13:34:58 +0000233 self.assertEqual(q.get(), args[1:])
234 self.assertEqual(q.get(), kwargs)
235 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000236 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000237 self.assertEqual(q.get(), current.authkey)
238 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000239
240 p.join()
241
Ezio Melotti2623a372010-11-21 13:34:58 +0000242 self.assertEqual(p.exitcode, 0)
243 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000244 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000245
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000246 @classmethod
247 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000248 time.sleep(1000)
249
250 def test_terminate(self):
251 if self.TYPE == 'threads':
252 return
253
254 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000255 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000256 p.start()
257
258 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000259 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000260 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261
262 p.terminate()
263
264 join = TimingWrapper(p.join)
265 self.assertEqual(join(), None)
266 self.assertTimingAlmostEqual(join.elapsed, 0.0)
267
268 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000270
271 p.join()
272
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000273 # XXX sometimes get p.exitcode == 0 on Windows ...
274 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000275
276 def test_cpu_count(self):
277 try:
278 cpus = multiprocessing.cpu_count()
279 except NotImplementedError:
280 cpus = 1
281 self.assertTrue(type(cpus) is int)
282 self.assertTrue(cpus >= 1)
283
284 def test_active_children(self):
285 self.assertEqual(type(self.active_children()), list)
286
287 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000288 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000289
Jesus Cea6f6016b2011-09-09 20:26:57 +0200290 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000291 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000292 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000293
294 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000295 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000296
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000297 @classmethod
298 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000299 from multiprocessing import forking
300 wconn.send(id)
301 if len(id) < 2:
302 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000303 p = cls.Process(
304 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000305 )
306 p.start()
307 p.join()
308
309 def test_recursion(self):
310 rconn, wconn = self.Pipe(duplex=False)
311 self._test_recursion(wconn, [])
312
313 time.sleep(DELTA)
314 result = []
315 while rconn.poll():
316 result.append(rconn.recv())
317
318 expected = [
319 [],
320 [0],
321 [0, 0],
322 [0, 1],
323 [1],
324 [1, 0],
325 [1, 1]
326 ]
327 self.assertEqual(result, expected)
328
Richard Oudkerk2182e052012-06-06 19:01:14 +0100329 @classmethod
330 def _test_sys_exit(cls, reason, testfn):
331 sys.stderr = open(testfn, 'w')
332 sys.exit(reason)
333
334 def test_sys_exit(self):
335 # See Issue 13854
336 if self.TYPE == 'threads':
337 return
338
339 testfn = test_support.TESTFN
340 self.addCleanup(test_support.unlink, testfn)
341
342 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
343 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
344 p.daemon = True
345 p.start()
346 p.join(5)
347 self.assertEqual(p.exitcode, code)
348
349 with open(testfn, 'r') as f:
350 self.assertEqual(f.read().rstrip(), str(reason))
351
352 for reason in (True, False, 8):
353 p = self.Process(target=sys.exit, args=(reason,))
354 p.daemon = True
355 p.start()
356 p.join(5)
357 self.assertEqual(p.exitcode, reason)
358
Benjamin Petersondfd79492008-06-13 19:13:39 +0000359#
360#
361#
362
363class _UpperCaser(multiprocessing.Process):
364
365 def __init__(self):
366 multiprocessing.Process.__init__(self)
367 self.child_conn, self.parent_conn = multiprocessing.Pipe()
368
369 def run(self):
370 self.parent_conn.close()
371 for s in iter(self.child_conn.recv, None):
372 self.child_conn.send(s.upper())
373 self.child_conn.close()
374
375 def submit(self, s):
376 assert type(s) is str
377 self.parent_conn.send(s)
378 return self.parent_conn.recv()
379
380 def stop(self):
381 self.parent_conn.send(None)
382 self.parent_conn.close()
383 self.child_conn.close()
384
385class _TestSubclassingProcess(BaseTestCase):
386
387 ALLOWED_TYPES = ('processes',)
388
389 def test_subclassing(self):
390 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200391 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000392 uppercaser.start()
393 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
394 self.assertEqual(uppercaser.submit('world'), 'WORLD')
395 uppercaser.stop()
396 uppercaser.join()
397
398#
399#
400#
401
402def queue_empty(q):
403 if hasattr(q, 'empty'):
404 return q.empty()
405 else:
406 return q.qsize() == 0
407
408def queue_full(q, maxsize):
409 if hasattr(q, 'full'):
410 return q.full()
411 else:
412 return q.qsize() == maxsize
413
414
415class _TestQueue(BaseTestCase):
416
417
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000418 @classmethod
419 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000420 child_can_start.wait()
421 for i in range(6):
422 queue.get()
423 parent_can_continue.set()
424
425 def test_put(self):
426 MAXSIZE = 6
427 queue = self.Queue(maxsize=MAXSIZE)
428 child_can_start = self.Event()
429 parent_can_continue = self.Event()
430
431 proc = self.Process(
432 target=self._test_put,
433 args=(queue, child_can_start, parent_can_continue)
434 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000435 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000436 proc.start()
437
438 self.assertEqual(queue_empty(queue), True)
439 self.assertEqual(queue_full(queue, MAXSIZE), False)
440
441 queue.put(1)
442 queue.put(2, True)
443 queue.put(3, True, None)
444 queue.put(4, False)
445 queue.put(5, False, None)
446 queue.put_nowait(6)
447
448 # the values may be in buffer but not yet in pipe so sleep a bit
449 time.sleep(DELTA)
450
451 self.assertEqual(queue_empty(queue), False)
452 self.assertEqual(queue_full(queue, MAXSIZE), True)
453
454 put = TimingWrapper(queue.put)
455 put_nowait = TimingWrapper(queue.put_nowait)
456
457 self.assertRaises(Queue.Full, put, 7, False)
458 self.assertTimingAlmostEqual(put.elapsed, 0)
459
460 self.assertRaises(Queue.Full, put, 7, False, None)
461 self.assertTimingAlmostEqual(put.elapsed, 0)
462
463 self.assertRaises(Queue.Full, put_nowait, 7)
464 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
465
466 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
467 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
468
469 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
470 self.assertTimingAlmostEqual(put.elapsed, 0)
471
472 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
473 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
474
475 child_can_start.set()
476 parent_can_continue.wait()
477
478 self.assertEqual(queue_empty(queue), True)
479 self.assertEqual(queue_full(queue, MAXSIZE), False)
480
481 proc.join()
482
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000483 @classmethod
484 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000485 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000486 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000487 queue.put(2)
488 queue.put(3)
489 queue.put(4)
490 queue.put(5)
491 parent_can_continue.set()
492
493 def test_get(self):
494 queue = self.Queue()
495 child_can_start = self.Event()
496 parent_can_continue = self.Event()
497
498 proc = self.Process(
499 target=self._test_get,
500 args=(queue, child_can_start, parent_can_continue)
501 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000502 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000503 proc.start()
504
505 self.assertEqual(queue_empty(queue), True)
506
507 child_can_start.set()
508 parent_can_continue.wait()
509
510 time.sleep(DELTA)
511 self.assertEqual(queue_empty(queue), False)
512
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000513 # Hangs unexpectedly, remove for now
514 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000515 self.assertEqual(queue.get(True, None), 2)
516 self.assertEqual(queue.get(True), 3)
517 self.assertEqual(queue.get(timeout=1), 4)
518 self.assertEqual(queue.get_nowait(), 5)
519
520 self.assertEqual(queue_empty(queue), True)
521
522 get = TimingWrapper(queue.get)
523 get_nowait = TimingWrapper(queue.get_nowait)
524
525 self.assertRaises(Queue.Empty, get, False)
526 self.assertTimingAlmostEqual(get.elapsed, 0)
527
528 self.assertRaises(Queue.Empty, get, False, None)
529 self.assertTimingAlmostEqual(get.elapsed, 0)
530
531 self.assertRaises(Queue.Empty, get_nowait)
532 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
533
534 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
535 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
536
537 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
538 self.assertTimingAlmostEqual(get.elapsed, 0)
539
540 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
541 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
542
543 proc.join()
544
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000545 @classmethod
546 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000547 for i in range(10, 20):
548 queue.put(i)
549 # note that at this point the items may only be buffered, so the
550 # process cannot shutdown until the feeder thread has finished
551 # pushing items onto the pipe.
552
553 def test_fork(self):
554 # Old versions of Queue would fail to create a new feeder
555 # thread for a forked process if the original process had its
556 # own feeder thread. This test checks that this no longer
557 # happens.
558
559 queue = self.Queue()
560
561 # put items on queue so that main process starts a feeder thread
562 for i in range(10):
563 queue.put(i)
564
565 # wait to make sure thread starts before we fork a new process
566 time.sleep(DELTA)
567
568 # fork process
569 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200570 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000571 p.start()
572
573 # check that all expected items are in the queue
574 for i in range(20):
575 self.assertEqual(queue.get(), i)
576 self.assertRaises(Queue.Empty, queue.get, False)
577
578 p.join()
579
580 def test_qsize(self):
581 q = self.Queue()
582 try:
583 self.assertEqual(q.qsize(), 0)
584 except NotImplementedError:
585 return
586 q.put(1)
587 self.assertEqual(q.qsize(), 1)
588 q.put(5)
589 self.assertEqual(q.qsize(), 2)
590 q.get()
591 self.assertEqual(q.qsize(), 1)
592 q.get()
593 self.assertEqual(q.qsize(), 0)
594
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000595 @classmethod
596 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000597 for obj in iter(q.get, None):
598 time.sleep(DELTA)
599 q.task_done()
600
601 def test_task_done(self):
602 queue = self.JoinableQueue()
603
604 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000605 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000606
607 workers = [self.Process(target=self._test_task_done, args=(queue,))
608 for i in xrange(4)]
609
610 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200611 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000612 p.start()
613
614 for i in xrange(10):
615 queue.put(i)
616
617 queue.join()
618
619 for p in workers:
620 queue.put(None)
621
622 for p in workers:
623 p.join()
624
625#
626#
627#
628
629class _TestLock(BaseTestCase):
630
631 def test_lock(self):
632 lock = self.Lock()
633 self.assertEqual(lock.acquire(), True)
634 self.assertEqual(lock.acquire(False), False)
635 self.assertEqual(lock.release(), None)
636 self.assertRaises((ValueError, threading.ThreadError), lock.release)
637
638 def test_rlock(self):
639 lock = self.RLock()
640 self.assertEqual(lock.acquire(), True)
641 self.assertEqual(lock.acquire(), True)
642 self.assertEqual(lock.acquire(), True)
643 self.assertEqual(lock.release(), None)
644 self.assertEqual(lock.release(), None)
645 self.assertEqual(lock.release(), None)
646 self.assertRaises((AssertionError, RuntimeError), lock.release)
647
Jesse Noller82eb5902009-03-30 23:29:31 +0000648 def test_lock_context(self):
649 with self.Lock():
650 pass
651
Benjamin Petersondfd79492008-06-13 19:13:39 +0000652
653class _TestSemaphore(BaseTestCase):
654
655 def _test_semaphore(self, sem):
656 self.assertReturnsIfImplemented(2, get_value, sem)
657 self.assertEqual(sem.acquire(), True)
658 self.assertReturnsIfImplemented(1, get_value, sem)
659 self.assertEqual(sem.acquire(), True)
660 self.assertReturnsIfImplemented(0, get_value, sem)
661 self.assertEqual(sem.acquire(False), False)
662 self.assertReturnsIfImplemented(0, get_value, sem)
663 self.assertEqual(sem.release(), None)
664 self.assertReturnsIfImplemented(1, get_value, sem)
665 self.assertEqual(sem.release(), None)
666 self.assertReturnsIfImplemented(2, get_value, sem)
667
668 def test_semaphore(self):
669 sem = self.Semaphore(2)
670 self._test_semaphore(sem)
671 self.assertEqual(sem.release(), None)
672 self.assertReturnsIfImplemented(3, get_value, sem)
673 self.assertEqual(sem.release(), None)
674 self.assertReturnsIfImplemented(4, get_value, sem)
675
676 def test_bounded_semaphore(self):
677 sem = self.BoundedSemaphore(2)
678 self._test_semaphore(sem)
679 # Currently fails on OS/X
680 #if HAVE_GETVALUE:
681 # self.assertRaises(ValueError, sem.release)
682 # self.assertReturnsIfImplemented(2, get_value, sem)
683
684 def test_timeout(self):
685 if self.TYPE != 'processes':
686 return
687
688 sem = self.Semaphore(0)
689 acquire = TimingWrapper(sem.acquire)
690
691 self.assertEqual(acquire(False), False)
692 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
693
694 self.assertEqual(acquire(False, None), False)
695 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
696
697 self.assertEqual(acquire(False, TIMEOUT1), False)
698 self.assertTimingAlmostEqual(acquire.elapsed, 0)
699
700 self.assertEqual(acquire(True, TIMEOUT2), False)
701 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
702
703 self.assertEqual(acquire(timeout=TIMEOUT3), False)
704 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
705
706
707class _TestCondition(BaseTestCase):
708
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000709 @classmethod
710 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000711 cond.acquire()
712 sleeping.release()
713 cond.wait(timeout)
714 woken.release()
715 cond.release()
716
717 def check_invariant(self, cond):
718 # this is only supposed to succeed when there are no sleepers
719 if self.TYPE == 'processes':
720 try:
721 sleepers = (cond._sleeping_count.get_value() -
722 cond._woken_count.get_value())
723 self.assertEqual(sleepers, 0)
724 self.assertEqual(cond._wait_semaphore.get_value(), 0)
725 except NotImplementedError:
726 pass
727
728 def test_notify(self):
729 cond = self.Condition()
730 sleeping = self.Semaphore(0)
731 woken = self.Semaphore(0)
732
733 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000734 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000735 p.start()
736
737 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000738 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000739 p.start()
740
741 # wait for both children to start sleeping
742 sleeping.acquire()
743 sleeping.acquire()
744
745 # check no process/thread has woken up
746 time.sleep(DELTA)
747 self.assertReturnsIfImplemented(0, get_value, woken)
748
749 # wake up one process/thread
750 cond.acquire()
751 cond.notify()
752 cond.release()
753
754 # check one process/thread has woken up
755 time.sleep(DELTA)
756 self.assertReturnsIfImplemented(1, get_value, woken)
757
758 # wake up another
759 cond.acquire()
760 cond.notify()
761 cond.release()
762
763 # check other has woken up
764 time.sleep(DELTA)
765 self.assertReturnsIfImplemented(2, get_value, woken)
766
767 # check state is not mucked up
768 self.check_invariant(cond)
769 p.join()
770
771 def test_notify_all(self):
772 cond = self.Condition()
773 sleeping = self.Semaphore(0)
774 woken = self.Semaphore(0)
775
776 # start some threads/processes which will timeout
777 for i in range(3):
778 p = self.Process(target=self.f,
779 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000780 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000781 p.start()
782
783 t = threading.Thread(target=self.f,
784 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000785 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000786 t.start()
787
788 # wait for them all to sleep
789 for i in xrange(6):
790 sleeping.acquire()
791
792 # check they have all timed out
793 for i in xrange(6):
794 woken.acquire()
795 self.assertReturnsIfImplemented(0, get_value, woken)
796
797 # check state is not mucked up
798 self.check_invariant(cond)
799
800 # start some more threads/processes
801 for i in range(3):
802 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000803 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000804 p.start()
805
806 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000807 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000808 t.start()
809
810 # wait for them to all sleep
811 for i in xrange(6):
812 sleeping.acquire()
813
814 # check no process/thread has woken up
815 time.sleep(DELTA)
816 self.assertReturnsIfImplemented(0, get_value, woken)
817
818 # wake them all up
819 cond.acquire()
820 cond.notify_all()
821 cond.release()
822
823 # check they have all woken
824 time.sleep(DELTA)
825 self.assertReturnsIfImplemented(6, get_value, woken)
826
827 # check state is not mucked up
828 self.check_invariant(cond)
829
830 def test_timeout(self):
831 cond = self.Condition()
832 wait = TimingWrapper(cond.wait)
833 cond.acquire()
834 res = wait(TIMEOUT1)
835 cond.release()
836 self.assertEqual(res, None)
837 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
838
839
840class _TestEvent(BaseTestCase):
841
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000842 @classmethod
843 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000844 time.sleep(TIMEOUT2)
845 event.set()
846
847 def test_event(self):
848 event = self.Event()
849 wait = TimingWrapper(event.wait)
850
Ezio Melottic2077b02011-03-16 12:34:31 +0200851 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000852 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000853 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000854
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000855 # Removed, threading.Event.wait() will return the value of the __flag
856 # instead of None. API Shear with the semaphore backed mp.Event
857 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000859 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000860 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
861
862 event.set()
863
864 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000865 self.assertEqual(event.is_set(), True)
866 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000867 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000868 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000869 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
870 # self.assertEqual(event.is_set(), True)
871
872 event.clear()
873
874 #self.assertEqual(event.is_set(), False)
875
Jesus Cea6f6016b2011-09-09 20:26:57 +0200876 p = self.Process(target=self._test_event, args=(event,))
877 p.daemon = True
878 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000879 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000880
881#
882#
883#
884
885class _TestValue(BaseTestCase):
886
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000887 ALLOWED_TYPES = ('processes',)
888
Benjamin Petersondfd79492008-06-13 19:13:39 +0000889 codes_values = [
890 ('i', 4343, 24234),
891 ('d', 3.625, -4.25),
892 ('h', -232, 234),
893 ('c', latin('x'), latin('y'))
894 ]
895
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000896 def setUp(self):
897 if not HAS_SHAREDCTYPES:
898 self.skipTest("requires multiprocessing.sharedctypes")
899
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000900 @classmethod
901 def _test(cls, values):
902 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000903 sv.value = cv[2]
904
905
906 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 if raw:
908 values = [self.RawValue(code, value)
909 for code, value, _ in self.codes_values]
910 else:
911 values = [self.Value(code, value)
912 for code, value, _ in self.codes_values]
913
914 for sv, cv in zip(values, self.codes_values):
915 self.assertEqual(sv.value, cv[1])
916
917 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200918 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000919 proc.start()
920 proc.join()
921
922 for sv, cv in zip(values, self.codes_values):
923 self.assertEqual(sv.value, cv[2])
924
925 def test_rawvalue(self):
926 self.test_value(raw=True)
927
928 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 val1 = self.Value('i', 5)
930 lock1 = val1.get_lock()
931 obj1 = val1.get_obj()
932
933 val2 = self.Value('i', 5, lock=None)
934 lock2 = val2.get_lock()
935 obj2 = val2.get_obj()
936
937 lock = self.Lock()
938 val3 = self.Value('i', 5, lock=lock)
939 lock3 = val3.get_lock()
940 obj3 = val3.get_obj()
941 self.assertEqual(lock, lock3)
942
Jesse Noller6ab22152009-01-18 02:45:38 +0000943 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000944 self.assertFalse(hasattr(arr4, 'get_lock'))
945 self.assertFalse(hasattr(arr4, 'get_obj'))
946
Jesse Noller6ab22152009-01-18 02:45:38 +0000947 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
948
949 arr5 = self.RawValue('i', 5)
950 self.assertFalse(hasattr(arr5, 'get_lock'))
951 self.assertFalse(hasattr(arr5, 'get_obj'))
952
Benjamin Petersondfd79492008-06-13 19:13:39 +0000953
954class _TestArray(BaseTestCase):
955
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000956 ALLOWED_TYPES = ('processes',)
957
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000958 @classmethod
959 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000960 for i in range(1, len(seq)):
961 seq[i] += seq[i-1]
962
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000963 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000964 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000965 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
966 if raw:
967 arr = self.RawArray('i', seq)
968 else:
969 arr = self.Array('i', seq)
970
971 self.assertEqual(len(arr), len(seq))
972 self.assertEqual(arr[3], seq[3])
973 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
974
975 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
976
977 self.assertEqual(list(arr[:]), seq)
978
979 self.f(seq)
980
981 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200982 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000983 p.start()
984 p.join()
985
986 self.assertEqual(list(arr[:]), seq)
987
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000988 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000989 def test_array_from_size(self):
990 size = 10
991 # Test for zeroing (see issue #11675).
992 # The repetition below strengthens the test by increasing the chances
993 # of previously allocated non-zero memory being used for the new array
994 # on the 2nd and 3rd loops.
995 for _ in range(3):
996 arr = self.Array('i', size)
997 self.assertEqual(len(arr), size)
998 self.assertEqual(list(arr), [0] * size)
999 arr[:] = range(10)
1000 self.assertEqual(list(arr), range(10))
1001 del arr
1002
1003 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001004 def test_rawarray(self):
1005 self.test_array(raw=True)
1006
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001007 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001008 def test_array_accepts_long(self):
1009 arr = self.Array('i', 10L)
1010 self.assertEqual(len(arr), 10)
1011 raw_arr = self.RawArray('i', 10L)
1012 self.assertEqual(len(raw_arr), 10)
1013
1014 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001015 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001016 arr1 = self.Array('i', range(10))
1017 lock1 = arr1.get_lock()
1018 obj1 = arr1.get_obj()
1019
1020 arr2 = self.Array('i', range(10), lock=None)
1021 lock2 = arr2.get_lock()
1022 obj2 = arr2.get_obj()
1023
1024 lock = self.Lock()
1025 arr3 = self.Array('i', range(10), lock=lock)
1026 lock3 = arr3.get_lock()
1027 obj3 = arr3.get_obj()
1028 self.assertEqual(lock, lock3)
1029
Jesse Noller6ab22152009-01-18 02:45:38 +00001030 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001031 self.assertFalse(hasattr(arr4, 'get_lock'))
1032 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001033 self.assertRaises(AttributeError,
1034 self.Array, 'i', range(10), lock='notalock')
1035
1036 arr5 = self.RawArray('i', range(10))
1037 self.assertFalse(hasattr(arr5, 'get_lock'))
1038 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001039
1040#
1041#
1042#
1043
1044class _TestContainers(BaseTestCase):
1045
1046 ALLOWED_TYPES = ('manager',)
1047
1048 def test_list(self):
1049 a = self.list(range(10))
1050 self.assertEqual(a[:], range(10))
1051
1052 b = self.list()
1053 self.assertEqual(b[:], [])
1054
1055 b.extend(range(5))
1056 self.assertEqual(b[:], range(5))
1057
1058 self.assertEqual(b[2], 2)
1059 self.assertEqual(b[2:10], [2,3,4])
1060
1061 b *= 2
1062 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1063
1064 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1065
1066 self.assertEqual(a[:], range(10))
1067
1068 d = [a, b]
1069 e = self.list(d)
1070 self.assertEqual(
1071 e[:],
1072 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1073 )
1074
1075 f = self.list([a])
1076 a.append('hello')
1077 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1078
1079 def test_dict(self):
1080 d = self.dict()
1081 indices = range(65, 70)
1082 for i in indices:
1083 d[i] = chr(i)
1084 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1085 self.assertEqual(sorted(d.keys()), indices)
1086 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1087 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1088
1089 def test_namespace(self):
1090 n = self.Namespace()
1091 n.name = 'Bob'
1092 n.job = 'Builder'
1093 n._hidden = 'hidden'
1094 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1095 del n.job
1096 self.assertEqual(str(n), "Namespace(name='Bob')")
1097 self.assertTrue(hasattr(n, 'name'))
1098 self.assertTrue(not hasattr(n, 'job'))
1099
1100#
1101#
1102#
1103
1104def sqr(x, wait=0.0):
1105 time.sleep(wait)
1106 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001107class _TestPool(BaseTestCase):
1108
1109 def test_apply(self):
1110 papply = self.pool.apply
1111 self.assertEqual(papply(sqr, (5,)), sqr(5))
1112 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1113
1114 def test_map(self):
1115 pmap = self.pool.map
1116 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1117 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1118 map(sqr, range(100)))
1119
Jesse Noller7530e472009-07-16 14:23:04 +00001120 def test_map_chunksize(self):
1121 try:
1122 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1123 except multiprocessing.TimeoutError:
1124 self.fail("pool.map_async with chunksize stalled on null list")
1125
Benjamin Petersondfd79492008-06-13 19:13:39 +00001126 def test_async(self):
1127 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1128 get = TimingWrapper(res.get)
1129 self.assertEqual(get(), 49)
1130 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1131
1132 def test_async_timeout(self):
1133 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1134 get = TimingWrapper(res.get)
1135 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1136 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1137
1138 def test_imap(self):
1139 it = self.pool.imap(sqr, range(10))
1140 self.assertEqual(list(it), map(sqr, range(10)))
1141
1142 it = self.pool.imap(sqr, range(10))
1143 for i in range(10):
1144 self.assertEqual(it.next(), i*i)
1145 self.assertRaises(StopIteration, it.next)
1146
1147 it = self.pool.imap(sqr, range(1000), chunksize=100)
1148 for i in range(1000):
1149 self.assertEqual(it.next(), i*i)
1150 self.assertRaises(StopIteration, it.next)
1151
1152 def test_imap_unordered(self):
1153 it = self.pool.imap_unordered(sqr, range(1000))
1154 self.assertEqual(sorted(it), map(sqr, range(1000)))
1155
1156 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1157 self.assertEqual(sorted(it), map(sqr, range(1000)))
1158
1159 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001160 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1161 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1162
Benjamin Petersondfd79492008-06-13 19:13:39 +00001163 p = multiprocessing.Pool(3)
1164 self.assertEqual(3, len(p._pool))
1165 p.close()
1166 p.join()
1167
1168 def test_terminate(self):
1169 if self.TYPE == 'manager':
1170 # On Unix a forked process increfs each shared object to
1171 # which its parent process held a reference. If the
1172 # forked process gets terminated then there is likely to
1173 # be a reference leak. So to prevent
1174 # _TestZZZNumberOfObjects from failing we skip this test
1175 # when using a manager.
1176 return
1177
1178 result = self.pool.map_async(
1179 time.sleep, [0.1 for i in range(10000)], chunksize=1
1180 )
1181 self.pool.terminate()
1182 join = TimingWrapper(self.pool.join)
1183 join()
1184 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001185
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001186 def test_empty_iterable(self):
1187 # See Issue 12157
1188 p = self.Pool(1)
1189
1190 self.assertEqual(p.map(sqr, []), [])
1191 self.assertEqual(list(p.imap(sqr, [])), [])
1192 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1193 self.assertEqual(p.map_async(sqr, []).get(), [])
1194
1195 p.close()
1196 p.join()
1197
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001198def unpickleable_result():
1199 return lambda: 42
1200
1201class _TestPoolWorkerErrors(BaseTestCase):
1202 ALLOWED_TYPES = ('processes', )
1203
1204 def test_unpickleable_result(self):
1205 from multiprocessing.pool import MaybeEncodingError
1206 p = multiprocessing.Pool(2)
1207
1208 # Make sure we don't lose pool processes because of encoding errors.
1209 for iteration in range(20):
1210 res = p.apply_async(unpickleable_result)
1211 self.assertRaises(MaybeEncodingError, res.get)
1212
1213 p.close()
1214 p.join()
1215
Jesse Noller654ade32010-01-27 03:05:57 +00001216class _TestPoolWorkerLifetime(BaseTestCase):
1217
1218 ALLOWED_TYPES = ('processes', )
1219 def test_pool_worker_lifetime(self):
1220 p = multiprocessing.Pool(3, maxtasksperchild=10)
1221 self.assertEqual(3, len(p._pool))
1222 origworkerpids = [w.pid for w in p._pool]
1223 # Run many tasks so each worker gets replaced (hopefully)
1224 results = []
1225 for i in range(100):
1226 results.append(p.apply_async(sqr, (i, )))
1227 # Fetch the results and verify we got the right answers,
1228 # also ensuring all the tasks have completed.
1229 for (j, res) in enumerate(results):
1230 self.assertEqual(res.get(), sqr(j))
1231 # Refill the pool
1232 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001233 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001234 # (countdown * DELTA = 5 seconds max startup process time)
1235 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001236 while countdown and not all(w.is_alive() for w in p._pool):
1237 countdown -= 1
1238 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001239 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001240 # All pids should be assigned. See issue #7805.
1241 self.assertNotIn(None, origworkerpids)
1242 self.assertNotIn(None, finalworkerpids)
1243 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001244 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1245 p.close()
1246 p.join()
1247
Charles-François Natali46f990e2011-10-24 18:43:51 +02001248 def test_pool_worker_lifetime_early_close(self):
1249 # Issue #10332: closing a pool whose workers have limited lifetimes
1250 # before all the tasks completed would make join() hang.
1251 p = multiprocessing.Pool(3, maxtasksperchild=1)
1252 results = []
1253 for i in range(6):
1254 results.append(p.apply_async(sqr, (i, 0.3)))
1255 p.close()
1256 p.join()
1257 # check the results
1258 for (j, res) in enumerate(results):
1259 self.assertEqual(res.get(), sqr(j))
1260
1261
Benjamin Petersondfd79492008-06-13 19:13:39 +00001262#
1263# Test that manager has expected number of shared objects left
1264#
1265
1266class _TestZZZNumberOfObjects(BaseTestCase):
1267 # Because test cases are sorted alphabetically, this one will get
1268 # run after all the other tests for the manager. It tests that
1269 # there have been no "reference leaks" for the manager's shared
1270 # objects. Note the comment in _TestPool.test_terminate().
1271 ALLOWED_TYPES = ('manager',)
1272
1273 def test_number_of_objects(self):
1274 EXPECTED_NUMBER = 1 # the pool object is still alive
1275 multiprocessing.active_children() # discard dead process objs
1276 gc.collect() # do garbage collection
1277 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001278 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001279 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001280 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001281 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001282
1283 self.assertEqual(refs, EXPECTED_NUMBER)
1284
1285#
1286# Test of creating a customized manager class
1287#
1288
1289from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1290
1291class FooBar(object):
1292 def f(self):
1293 return 'f()'
1294 def g(self):
1295 raise ValueError
1296 def _h(self):
1297 return '_h()'
1298
1299def baz():
1300 for i in xrange(10):
1301 yield i*i
1302
1303class IteratorProxy(BaseProxy):
1304 _exposed_ = ('next', '__next__')
1305 def __iter__(self):
1306 return self
1307 def next(self):
1308 return self._callmethod('next')
1309 def __next__(self):
1310 return self._callmethod('__next__')
1311
1312class MyManager(BaseManager):
1313 pass
1314
1315MyManager.register('Foo', callable=FooBar)
1316MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1317MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1318
1319
1320class _TestMyManager(BaseTestCase):
1321
1322 ALLOWED_TYPES = ('manager',)
1323
1324 def test_mymanager(self):
1325 manager = MyManager()
1326 manager.start()
1327
1328 foo = manager.Foo()
1329 bar = manager.Bar()
1330 baz = manager.baz()
1331
1332 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1333 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1334
1335 self.assertEqual(foo_methods, ['f', 'g'])
1336 self.assertEqual(bar_methods, ['f', '_h'])
1337
1338 self.assertEqual(foo.f(), 'f()')
1339 self.assertRaises(ValueError, foo.g)
1340 self.assertEqual(foo._callmethod('f'), 'f()')
1341 self.assertRaises(RemoteError, foo._callmethod, '_h')
1342
1343 self.assertEqual(bar.f(), 'f()')
1344 self.assertEqual(bar._h(), '_h()')
1345 self.assertEqual(bar._callmethod('f'), 'f()')
1346 self.assertEqual(bar._callmethod('_h'), '_h()')
1347
1348 self.assertEqual(list(baz), [i*i for i in range(10)])
1349
1350 manager.shutdown()
1351
1352#
1353# Test of connecting to a remote server and using xmlrpclib for serialization
1354#
1355
1356_queue = Queue.Queue()
1357def get_queue():
1358 return _queue
1359
1360class QueueManager(BaseManager):
1361 '''manager class used by server process'''
1362QueueManager.register('get_queue', callable=get_queue)
1363
1364class QueueManager2(BaseManager):
1365 '''manager class which specifies the same interface as QueueManager'''
1366QueueManager2.register('get_queue')
1367
1368
1369SERIALIZER = 'xmlrpclib'
1370
1371class _TestRemoteManager(BaseTestCase):
1372
1373 ALLOWED_TYPES = ('manager',)
1374
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001375 @classmethod
1376 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001377 manager = QueueManager2(
1378 address=address, authkey=authkey, serializer=SERIALIZER
1379 )
1380 manager.connect()
1381 queue = manager.get_queue()
1382 queue.put(('hello world', None, True, 2.25))
1383
1384 def test_remote(self):
1385 authkey = os.urandom(32)
1386
1387 manager = QueueManager(
1388 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1389 )
1390 manager.start()
1391
1392 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001393 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001394 p.start()
1395
1396 manager2 = QueueManager2(
1397 address=manager.address, authkey=authkey, serializer=SERIALIZER
1398 )
1399 manager2.connect()
1400 queue = manager2.get_queue()
1401
1402 # Note that xmlrpclib will deserialize object as a list not a tuple
1403 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1404
1405 # Because we are using xmlrpclib for serialization instead of
1406 # pickle this will cause a serialization error.
1407 self.assertRaises(Exception, queue.put, time.sleep)
1408
1409 # Make queue finalizer run before the server is stopped
1410 del queue
1411 manager.shutdown()
1412
Jesse Noller459a6482009-03-30 15:50:42 +00001413class _TestManagerRestart(BaseTestCase):
1414
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001415 @classmethod
1416 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001417 manager = QueueManager(
1418 address=address, authkey=authkey, serializer=SERIALIZER)
1419 manager.connect()
1420 queue = manager.get_queue()
1421 queue.put('hello world')
1422
1423 def test_rapid_restart(self):
1424 authkey = os.urandom(32)
1425 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001426 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001427 srvr = manager.get_server()
1428 addr = srvr.address
1429 # Close the connection.Listener socket which gets opened as a part
1430 # of manager.get_server(). It's not needed for the test.
1431 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001432 manager.start()
1433
1434 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001435 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001436 p.start()
1437 queue = manager.get_queue()
1438 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001439 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001440 manager.shutdown()
1441 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001442 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001443 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001444 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001445
Benjamin Petersondfd79492008-06-13 19:13:39 +00001446#
1447#
1448#
1449
1450SENTINEL = latin('')
1451
1452class _TestConnection(BaseTestCase):
1453
1454 ALLOWED_TYPES = ('processes', 'threads')
1455
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001456 @classmethod
1457 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001458 for msg in iter(conn.recv_bytes, SENTINEL):
1459 conn.send_bytes(msg)
1460 conn.close()
1461
1462 def test_connection(self):
1463 conn, child_conn = self.Pipe()
1464
1465 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001466 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001467 p.start()
1468
1469 seq = [1, 2.25, None]
1470 msg = latin('hello world')
1471 longmsg = msg * 10
1472 arr = array.array('i', range(4))
1473
1474 if self.TYPE == 'processes':
1475 self.assertEqual(type(conn.fileno()), int)
1476
1477 self.assertEqual(conn.send(seq), None)
1478 self.assertEqual(conn.recv(), seq)
1479
1480 self.assertEqual(conn.send_bytes(msg), None)
1481 self.assertEqual(conn.recv_bytes(), msg)
1482
1483 if self.TYPE == 'processes':
1484 buffer = array.array('i', [0]*10)
1485 expected = list(arr) + [0] * (10 - len(arr))
1486 self.assertEqual(conn.send_bytes(arr), None)
1487 self.assertEqual(conn.recv_bytes_into(buffer),
1488 len(arr) * buffer.itemsize)
1489 self.assertEqual(list(buffer), expected)
1490
1491 buffer = array.array('i', [0]*10)
1492 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1493 self.assertEqual(conn.send_bytes(arr), None)
1494 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1495 len(arr) * buffer.itemsize)
1496 self.assertEqual(list(buffer), expected)
1497
1498 buffer = bytearray(latin(' ' * 40))
1499 self.assertEqual(conn.send_bytes(longmsg), None)
1500 try:
1501 res = conn.recv_bytes_into(buffer)
1502 except multiprocessing.BufferTooShort, e:
1503 self.assertEqual(e.args, (longmsg,))
1504 else:
1505 self.fail('expected BufferTooShort, got %s' % res)
1506
1507 poll = TimingWrapper(conn.poll)
1508
1509 self.assertEqual(poll(), False)
1510 self.assertTimingAlmostEqual(poll.elapsed, 0)
1511
1512 self.assertEqual(poll(TIMEOUT1), False)
1513 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1514
1515 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001516 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001517
1518 self.assertEqual(poll(TIMEOUT1), True)
1519 self.assertTimingAlmostEqual(poll.elapsed, 0)
1520
1521 self.assertEqual(conn.recv(), None)
1522
1523 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1524 conn.send_bytes(really_big_msg)
1525 self.assertEqual(conn.recv_bytes(), really_big_msg)
1526
1527 conn.send_bytes(SENTINEL) # tell child to quit
1528 child_conn.close()
1529
1530 if self.TYPE == 'processes':
1531 self.assertEqual(conn.readable, True)
1532 self.assertEqual(conn.writable, True)
1533 self.assertRaises(EOFError, conn.recv)
1534 self.assertRaises(EOFError, conn.recv_bytes)
1535
1536 p.join()
1537
1538 def test_duplex_false(self):
1539 reader, writer = self.Pipe(duplex=False)
1540 self.assertEqual(writer.send(1), None)
1541 self.assertEqual(reader.recv(), 1)
1542 if self.TYPE == 'processes':
1543 self.assertEqual(reader.readable, True)
1544 self.assertEqual(reader.writable, False)
1545 self.assertEqual(writer.readable, False)
1546 self.assertEqual(writer.writable, True)
1547 self.assertRaises(IOError, reader.send, 2)
1548 self.assertRaises(IOError, writer.recv)
1549 self.assertRaises(IOError, writer.poll)
1550
1551 def test_spawn_close(self):
1552 # We test that a pipe connection can be closed by parent
1553 # process immediately after child is spawned. On Windows this
1554 # would have sometimes failed on old versions because
1555 # child_conn would be closed before the child got a chance to
1556 # duplicate it.
1557 conn, child_conn = self.Pipe()
1558
1559 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001560 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001561 p.start()
1562 child_conn.close() # this might complete before child initializes
1563
1564 msg = latin('hello')
1565 conn.send_bytes(msg)
1566 self.assertEqual(conn.recv_bytes(), msg)
1567
1568 conn.send_bytes(SENTINEL)
1569 conn.close()
1570 p.join()
1571
1572 def test_sendbytes(self):
1573 if self.TYPE != 'processes':
1574 return
1575
1576 msg = latin('abcdefghijklmnopqrstuvwxyz')
1577 a, b = self.Pipe()
1578
1579 a.send_bytes(msg)
1580 self.assertEqual(b.recv_bytes(), msg)
1581
1582 a.send_bytes(msg, 5)
1583 self.assertEqual(b.recv_bytes(), msg[5:])
1584
1585 a.send_bytes(msg, 7, 8)
1586 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1587
1588 a.send_bytes(msg, 26)
1589 self.assertEqual(b.recv_bytes(), latin(''))
1590
1591 a.send_bytes(msg, 26, 0)
1592 self.assertEqual(b.recv_bytes(), latin(''))
1593
1594 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1595
1596 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1597
1598 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1599
1600 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1601
1602 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1603
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001604 @classmethod
1605 def _is_fd_assigned(cls, fd):
1606 try:
1607 os.fstat(fd)
1608 except OSError as e:
1609 if e.errno == errno.EBADF:
1610 return False
1611 raise
1612 else:
1613 return True
1614
1615 @classmethod
1616 def _writefd(cls, conn, data, create_dummy_fds=False):
1617 if create_dummy_fds:
1618 for i in range(0, 256):
1619 if not cls._is_fd_assigned(i):
1620 os.dup2(conn.fileno(), i)
1621 fd = reduction.recv_handle(conn)
1622 if msvcrt:
1623 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1624 os.write(fd, data)
1625 os.close(fd)
1626
Charles-François Natalif8413b22011-09-21 18:44:49 +02001627 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001628 def test_fd_transfer(self):
1629 if self.TYPE != 'processes':
1630 self.skipTest("only makes sense with processes")
1631 conn, child_conn = self.Pipe(duplex=True)
1632
1633 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001634 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001635 p.start()
1636 with open(test_support.TESTFN, "wb") as f:
1637 fd = f.fileno()
1638 if msvcrt:
1639 fd = msvcrt.get_osfhandle(fd)
1640 reduction.send_handle(conn, fd, p.pid)
1641 p.join()
1642 with open(test_support.TESTFN, "rb") as f:
1643 self.assertEqual(f.read(), b"foo")
1644
Charles-François Natalif8413b22011-09-21 18:44:49 +02001645 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001646 @unittest.skipIf(sys.platform == "win32",
1647 "test semantics don't make sense on Windows")
1648 @unittest.skipIf(MAXFD <= 256,
1649 "largest assignable fd number is too small")
1650 @unittest.skipUnless(hasattr(os, "dup2"),
1651 "test needs os.dup2()")
1652 def test_large_fd_transfer(self):
1653 # With fd > 256 (issue #11657)
1654 if self.TYPE != 'processes':
1655 self.skipTest("only makes sense with processes")
1656 conn, child_conn = self.Pipe(duplex=True)
1657
1658 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001659 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001660 p.start()
1661 with open(test_support.TESTFN, "wb") as f:
1662 fd = f.fileno()
1663 for newfd in range(256, MAXFD):
1664 if not self._is_fd_assigned(newfd):
1665 break
1666 else:
1667 self.fail("could not find an unassigned large file descriptor")
1668 os.dup2(fd, newfd)
1669 try:
1670 reduction.send_handle(conn, newfd, p.pid)
1671 finally:
1672 os.close(newfd)
1673 p.join()
1674 with open(test_support.TESTFN, "rb") as f:
1675 self.assertEqual(f.read(), b"bar")
1676
Jesus Ceac23484b2011-09-21 03:47:39 +02001677 @classmethod
1678 def _send_data_without_fd(self, conn):
1679 os.write(conn.fileno(), b"\0")
1680
Charles-François Natalif8413b22011-09-21 18:44:49 +02001681 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001682 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1683 def test_missing_fd_transfer(self):
1684 # Check that exception is raised when received data is not
1685 # accompanied by a file descriptor in ancillary data.
1686 if self.TYPE != 'processes':
1687 self.skipTest("only makes sense with processes")
1688 conn, child_conn = self.Pipe(duplex=True)
1689
1690 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1691 p.daemon = True
1692 p.start()
1693 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1694 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001695
Benjamin Petersondfd79492008-06-13 19:13:39 +00001696class _TestListenerClient(BaseTestCase):
1697
1698 ALLOWED_TYPES = ('processes', 'threads')
1699
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001700 @classmethod
1701 def _test(cls, address):
1702 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001703 conn.send('hello')
1704 conn.close()
1705
1706 def test_listener_client(self):
1707 for family in self.connection.families:
1708 l = self.connection.Listener(family=family)
1709 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001710 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001711 p.start()
1712 conn = l.accept()
1713 self.assertEqual(conn.recv(), 'hello')
1714 p.join()
1715 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001716
1717 def test_issue14725(self):
1718 l = self.connection.Listener()
1719 p = self.Process(target=self._test, args=(l.address,))
1720 p.daemon = True
1721 p.start()
1722 time.sleep(1)
1723 # On Windows the client process should by now have connected,
1724 # written data and closed the pipe handle by now. This causes
1725 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1726 # 14725.
1727 conn = l.accept()
1728 self.assertEqual(conn.recv(), 'hello')
1729 conn.close()
1730 p.join()
1731 l.close()
1732
Benjamin Petersondfd79492008-06-13 19:13:39 +00001733#
1734# Test of sending connection and socket objects between processes
1735#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001736"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001737class _TestPicklingConnections(BaseTestCase):
1738
1739 ALLOWED_TYPES = ('processes',)
1740
1741 def _listener(self, conn, families):
1742 for fam in families:
1743 l = self.connection.Listener(family=fam)
1744 conn.send(l.address)
1745 new_conn = l.accept()
1746 conn.send(new_conn)
1747
1748 if self.TYPE == 'processes':
1749 l = socket.socket()
1750 l.bind(('localhost', 0))
1751 conn.send(l.getsockname())
1752 l.listen(1)
1753 new_conn, addr = l.accept()
1754 conn.send(new_conn)
1755
1756 conn.recv()
1757
1758 def _remote(self, conn):
1759 for (address, msg) in iter(conn.recv, None):
1760 client = self.connection.Client(address)
1761 client.send(msg.upper())
1762 client.close()
1763
1764 if self.TYPE == 'processes':
1765 address, msg = conn.recv()
1766 client = socket.socket()
1767 client.connect(address)
1768 client.sendall(msg.upper())
1769 client.close()
1770
1771 conn.close()
1772
1773 def test_pickling(self):
1774 try:
1775 multiprocessing.allow_connection_pickling()
1776 except ImportError:
1777 return
1778
1779 families = self.connection.families
1780
1781 lconn, lconn0 = self.Pipe()
1782 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001783 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001784 lp.start()
1785 lconn0.close()
1786
1787 rconn, rconn0 = self.Pipe()
1788 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001789 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001790 rp.start()
1791 rconn0.close()
1792
1793 for fam in families:
1794 msg = ('This connection uses family %s' % fam).encode('ascii')
1795 address = lconn.recv()
1796 rconn.send((address, msg))
1797 new_conn = lconn.recv()
1798 self.assertEqual(new_conn.recv(), msg.upper())
1799
1800 rconn.send(None)
1801
1802 if self.TYPE == 'processes':
1803 msg = latin('This connection uses a normal socket')
1804 address = lconn.recv()
1805 rconn.send((address, msg))
1806 if hasattr(socket, 'fromfd'):
1807 new_conn = lconn.recv()
1808 self.assertEqual(new_conn.recv(100), msg.upper())
1809 else:
1810 # XXX On Windows with Py2.6 need to backport fromfd()
1811 discard = lconn.recv_bytes()
1812
1813 lconn.send(None)
1814
1815 rconn.close()
1816 lconn.close()
1817
1818 lp.join()
1819 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001820"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001821#
1822#
1823#
1824
1825class _TestHeap(BaseTestCase):
1826
1827 ALLOWED_TYPES = ('processes',)
1828
1829 def test_heap(self):
1830 iterations = 5000
1831 maxblocks = 50
1832 blocks = []
1833
1834 # create and destroy lots of blocks of different sizes
1835 for i in xrange(iterations):
1836 size = int(random.lognormvariate(0, 1) * 1000)
1837 b = multiprocessing.heap.BufferWrapper(size)
1838 blocks.append(b)
1839 if len(blocks) > maxblocks:
1840 i = random.randrange(maxblocks)
1841 del blocks[i]
1842
1843 # get the heap object
1844 heap = multiprocessing.heap.BufferWrapper._heap
1845
1846 # verify the state of the heap
1847 all = []
1848 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001849 heap._lock.acquire()
1850 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001851 for L in heap._len_to_seq.values():
1852 for arena, start, stop in L:
1853 all.append((heap._arenas.index(arena), start, stop,
1854 stop-start, 'free'))
1855 for arena, start, stop in heap._allocated_blocks:
1856 all.append((heap._arenas.index(arena), start, stop,
1857 stop-start, 'occupied'))
1858 occupied += (stop-start)
1859
1860 all.sort()
1861
1862 for i in range(len(all)-1):
1863 (arena, start, stop) = all[i][:3]
1864 (narena, nstart, nstop) = all[i+1][:3]
1865 self.assertTrue((arena != narena and nstart == 0) or
1866 (stop == nstart))
1867
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001868 def test_free_from_gc(self):
1869 # Check that freeing of blocks by the garbage collector doesn't deadlock
1870 # (issue #12352).
1871 # Make sure the GC is enabled, and set lower collection thresholds to
1872 # make collections more frequent (and increase the probability of
1873 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001874 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001875 gc.enable()
1876 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001877 thresholds = gc.get_threshold()
1878 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001879 gc.set_threshold(10)
1880
1881 # perform numerous block allocations, with cyclic references to make
1882 # sure objects are collected asynchronously by the gc
1883 for i in range(5000):
1884 a = multiprocessing.heap.BufferWrapper(1)
1885 b = multiprocessing.heap.BufferWrapper(1)
1886 # circular references
1887 a.buddy = b
1888 b.buddy = a
1889
Benjamin Petersondfd79492008-06-13 19:13:39 +00001890#
1891#
1892#
1893
Benjamin Petersondfd79492008-06-13 19:13:39 +00001894class _Foo(Structure):
1895 _fields_ = [
1896 ('x', c_int),
1897 ('y', c_double)
1898 ]
1899
1900class _TestSharedCTypes(BaseTestCase):
1901
1902 ALLOWED_TYPES = ('processes',)
1903
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001904 def setUp(self):
1905 if not HAS_SHAREDCTYPES:
1906 self.skipTest("requires multiprocessing.sharedctypes")
1907
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001908 @classmethod
1909 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001910 x.value *= 2
1911 y.value *= 2
1912 foo.x *= 2
1913 foo.y *= 2
1914 string.value *= 2
1915 for i in range(len(arr)):
1916 arr[i] *= 2
1917
1918 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001919 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001920 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001921 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001922 arr = self.Array('d', range(10), lock=lock)
1923 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001924 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001925
1926 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001927 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001928 p.start()
1929 p.join()
1930
1931 self.assertEqual(x.value, 14)
1932 self.assertAlmostEqual(y.value, 2.0/3.0)
1933 self.assertEqual(foo.x, 6)
1934 self.assertAlmostEqual(foo.y, 4.0)
1935 for i in range(10):
1936 self.assertAlmostEqual(arr[i], i*2)
1937 self.assertEqual(string.value, latin('hellohello'))
1938
1939 def test_synchronize(self):
1940 self.test_sharedctypes(lock=True)
1941
1942 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001943 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001944 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001945 foo.x = 0
1946 foo.y = 0
1947 self.assertEqual(bar.x, 2)
1948 self.assertAlmostEqual(bar.y, 5.0)
1949
1950#
1951#
1952#
1953
1954class _TestFinalize(BaseTestCase):
1955
1956 ALLOWED_TYPES = ('processes',)
1957
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001958 @classmethod
1959 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001960 class Foo(object):
1961 pass
1962
1963 a = Foo()
1964 util.Finalize(a, conn.send, args=('a',))
1965 del a # triggers callback for a
1966
1967 b = Foo()
1968 close_b = util.Finalize(b, conn.send, args=('b',))
1969 close_b() # triggers callback for b
1970 close_b() # does nothing because callback has already been called
1971 del b # does nothing because callback has already been called
1972
1973 c = Foo()
1974 util.Finalize(c, conn.send, args=('c',))
1975
1976 d10 = Foo()
1977 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1978
1979 d01 = Foo()
1980 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1981 d02 = Foo()
1982 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1983 d03 = Foo()
1984 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1985
1986 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1987
1988 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1989
Ezio Melottic2077b02011-03-16 12:34:31 +02001990 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001991 # garbage collecting locals
1992 util._exit_function()
1993 conn.close()
1994 os._exit(0)
1995
1996 def test_finalize(self):
1997 conn, child_conn = self.Pipe()
1998
1999 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002000 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002001 p.start()
2002 p.join()
2003
2004 result = [obj for obj in iter(conn.recv, 'STOP')]
2005 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2006
2007#
2008# Test that from ... import * works for each module
2009#
2010
2011class _TestImportStar(BaseTestCase):
2012
2013 ALLOWED_TYPES = ('processes',)
2014
2015 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002016 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002017 'multiprocessing', 'multiprocessing.connection',
2018 'multiprocessing.heap', 'multiprocessing.managers',
2019 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002020 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002021 ]
2022
Charles-François Natalif8413b22011-09-21 18:44:49 +02002023 if HAS_REDUCTION:
2024 modules.append('multiprocessing.reduction')
2025
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002026 if c_int is not None:
2027 # This module requires _ctypes
2028 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002029
2030 for name in modules:
2031 __import__(name)
2032 mod = sys.modules[name]
2033
2034 for attr in getattr(mod, '__all__', ()):
2035 self.assertTrue(
2036 hasattr(mod, attr),
2037 '%r does not have attribute %r' % (mod, attr)
2038 )
2039
2040#
2041# Quick test that logging works -- does not test logging output
2042#
2043
2044class _TestLogging(BaseTestCase):
2045
2046 ALLOWED_TYPES = ('processes',)
2047
2048 def test_enable_logging(self):
2049 logger = multiprocessing.get_logger()
2050 logger.setLevel(util.SUBWARNING)
2051 self.assertTrue(logger is not None)
2052 logger.debug('this will not be printed')
2053 logger.info('nor will this')
2054 logger.setLevel(LOG_LEVEL)
2055
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002056 @classmethod
2057 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002058 logger = multiprocessing.get_logger()
2059 conn.send(logger.getEffectiveLevel())
2060
2061 def test_level(self):
2062 LEVEL1 = 32
2063 LEVEL2 = 37
2064
2065 logger = multiprocessing.get_logger()
2066 root_logger = logging.getLogger()
2067 root_level = root_logger.level
2068
2069 reader, writer = multiprocessing.Pipe(duplex=False)
2070
2071 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002072 p = self.Process(target=self._test_level, args=(writer,))
2073 p.daemon = True
2074 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002075 self.assertEqual(LEVEL1, reader.recv())
2076
2077 logger.setLevel(logging.NOTSET)
2078 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002079 p = self.Process(target=self._test_level, args=(writer,))
2080 p.daemon = True
2081 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002082 self.assertEqual(LEVEL2, reader.recv())
2083
2084 root_logger.setLevel(root_level)
2085 logger.setLevel(level=LOG_LEVEL)
2086
Jesse Noller814d02d2009-11-21 14:38:23 +00002087
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002088# class _TestLoggingProcessName(BaseTestCase):
2089#
2090# def handle(self, record):
2091# assert record.processName == multiprocessing.current_process().name
2092# self.__handled = True
2093#
2094# def test_logging(self):
2095# handler = logging.Handler()
2096# handler.handle = self.handle
2097# self.__handled = False
2098# # Bypass getLogger() and side-effects
2099# logger = logging.getLoggerClass()(
2100# 'multiprocessing.test.TestLoggingProcessName')
2101# logger.addHandler(handler)
2102# logger.propagate = False
2103#
2104# logger.warn('foo')
2105# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002106
Benjamin Petersondfd79492008-06-13 19:13:39 +00002107#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002108# Test to verify handle verification, see issue 3321
2109#
2110
2111class TestInvalidHandle(unittest.TestCase):
2112
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002113 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002114 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002115 conn = _multiprocessing.Connection(44977608)
2116 self.assertRaises(IOError, conn.poll)
2117 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002118
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002119#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002120# Functions used to create test cases from the base ones in this module
2121#
2122
2123def get_attributes(Source, names):
2124 d = {}
2125 for name in names:
2126 obj = getattr(Source, name)
2127 if type(obj) == type(get_attributes):
2128 obj = staticmethod(obj)
2129 d[name] = obj
2130 return d
2131
2132def create_test_cases(Mixin, type):
2133 result = {}
2134 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002135 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002136
2137 for name in glob.keys():
2138 if name.startswith('_Test'):
2139 base = glob[name]
2140 if type in base.ALLOWED_TYPES:
2141 newname = 'With' + Type + name[1:]
2142 class Temp(base, unittest.TestCase, Mixin):
2143 pass
2144 result[newname] = Temp
2145 Temp.__name__ = newname
2146 Temp.__module__ = Mixin.__module__
2147 return result
2148
2149#
2150# Create test cases
2151#
2152
2153class ProcessesMixin(object):
2154 TYPE = 'processes'
2155 Process = multiprocessing.Process
2156 locals().update(get_attributes(multiprocessing, (
2157 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2158 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2159 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002160 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002161 )))
2162
2163testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2164globals().update(testcases_processes)
2165
2166
2167class ManagerMixin(object):
2168 TYPE = 'manager'
2169 Process = multiprocessing.Process
2170 manager = object.__new__(multiprocessing.managers.SyncManager)
2171 locals().update(get_attributes(manager, (
2172 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2173 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002174 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002175 )))
2176
2177testcases_manager = create_test_cases(ManagerMixin, type='manager')
2178globals().update(testcases_manager)
2179
2180
2181class ThreadsMixin(object):
2182 TYPE = 'threads'
2183 Process = multiprocessing.dummy.Process
2184 locals().update(get_attributes(multiprocessing.dummy, (
2185 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2186 'Condition', 'Event', 'Value', 'Array', 'current_process',
2187 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002188 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002189 )))
2190
2191testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2192globals().update(testcases_threads)
2193
Neal Norwitz0c519b32008-08-25 01:50:24 +00002194class OtherTest(unittest.TestCase):
2195 # TODO: add more tests for deliver/answer challenge.
2196 def test_deliver_challenge_auth_failure(self):
2197 class _FakeConnection(object):
2198 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002199 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002200 def send_bytes(self, data):
2201 pass
2202 self.assertRaises(multiprocessing.AuthenticationError,
2203 multiprocessing.connection.deliver_challenge,
2204 _FakeConnection(), b'abc')
2205
2206 def test_answer_challenge_auth_failure(self):
2207 class _FakeConnection(object):
2208 def __init__(self):
2209 self.count = 0
2210 def recv_bytes(self, size):
2211 self.count += 1
2212 if self.count == 1:
2213 return multiprocessing.connection.CHALLENGE
2214 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002215 return b'something bogus'
2216 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002217 def send_bytes(self, data):
2218 pass
2219 self.assertRaises(multiprocessing.AuthenticationError,
2220 multiprocessing.connection.answer_challenge,
2221 _FakeConnection(), b'abc')
2222
Jesse Noller7152f6d2009-04-02 05:17:26 +00002223#
2224# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2225#
2226
2227def initializer(ns):
2228 ns.test += 1
2229
2230class TestInitializers(unittest.TestCase):
2231 def setUp(self):
2232 self.mgr = multiprocessing.Manager()
2233 self.ns = self.mgr.Namespace()
2234 self.ns.test = 0
2235
2236 def tearDown(self):
2237 self.mgr.shutdown()
2238
2239 def test_manager_initializer(self):
2240 m = multiprocessing.managers.SyncManager()
2241 self.assertRaises(TypeError, m.start, 1)
2242 m.start(initializer, (self.ns,))
2243 self.assertEqual(self.ns.test, 1)
2244 m.shutdown()
2245
2246 def test_pool_initializer(self):
2247 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2248 p = multiprocessing.Pool(1, initializer, (self.ns,))
2249 p.close()
2250 p.join()
2251 self.assertEqual(self.ns.test, 1)
2252
Jesse Noller1b90efb2009-06-30 17:11:52 +00002253#
2254# Issue 5155, 5313, 5331: Test process in processes
2255# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2256#
2257
2258def _ThisSubProcess(q):
2259 try:
2260 item = q.get(block=False)
2261 except Queue.Empty:
2262 pass
2263
2264def _TestProcess(q):
2265 queue = multiprocessing.Queue()
2266 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002267 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002268 subProc.start()
2269 subProc.join()
2270
2271def _afunc(x):
2272 return x*x
2273
2274def pool_in_process():
2275 pool = multiprocessing.Pool(processes=4)
2276 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2277
2278class _file_like(object):
2279 def __init__(self, delegate):
2280 self._delegate = delegate
2281 self._pid = None
2282
2283 @property
2284 def cache(self):
2285 pid = os.getpid()
2286 # There are no race conditions since fork keeps only the running thread
2287 if pid != self._pid:
2288 self._pid = pid
2289 self._cache = []
2290 return self._cache
2291
2292 def write(self, data):
2293 self.cache.append(data)
2294
2295 def flush(self):
2296 self._delegate.write(''.join(self.cache))
2297 self._cache = []
2298
2299class TestStdinBadfiledescriptor(unittest.TestCase):
2300
2301 def test_queue_in_process(self):
2302 queue = multiprocessing.Queue()
2303 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2304 proc.start()
2305 proc.join()
2306
2307 def test_pool_in_process(self):
2308 p = multiprocessing.Process(target=pool_in_process)
2309 p.start()
2310 p.join()
2311
2312 def test_flushing(self):
2313 sio = StringIO()
2314 flike = _file_like(sio)
2315 flike.write('foo')
2316 proc = multiprocessing.Process(target=lambda: flike.flush())
2317 flike.flush()
2318 assert sio.getvalue() == 'foo'
2319
Richard Oudkerke4b99382012-07-27 14:05:46 +01002320#
2321# Test interaction with socket timeouts - see Issue #6056
2322#
2323
2324class TestTimeouts(unittest.TestCase):
2325 @classmethod
2326 def _test_timeout(cls, child, address):
2327 time.sleep(1)
2328 child.send(123)
2329 child.close()
2330 conn = multiprocessing.connection.Client(address)
2331 conn.send(456)
2332 conn.close()
2333
2334 def test_timeout(self):
2335 old_timeout = socket.getdefaulttimeout()
2336 try:
2337 socket.setdefaulttimeout(0.1)
2338 parent, child = multiprocessing.Pipe(duplex=True)
2339 l = multiprocessing.connection.Listener(family='AF_INET')
2340 p = multiprocessing.Process(target=self._test_timeout,
2341 args=(child, l.address))
2342 p.start()
2343 child.close()
2344 self.assertEqual(parent.recv(), 123)
2345 parent.close()
2346 conn = l.accept()
2347 self.assertEqual(conn.recv(), 456)
2348 conn.close()
2349 l.close()
2350 p.join(10)
2351 finally:
2352 socket.setdefaulttimeout(old_timeout)
2353
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002354#
2355# Test what happens with no "if __name__ == '__main__'"
2356#
2357
2358class TestNoForkBomb(unittest.TestCase):
2359 def test_noforkbomb(self):
2360 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2361 if WIN32:
2362 rc, out, err = test.script_helper.assert_python_failure(name)
2363 self.assertEqual('', out.decode('ascii'))
2364 self.assertIn('RuntimeError', err.decode('ascii'))
2365 else:
2366 rc, out, err = test.script_helper.assert_python_ok(name)
2367 self.assertEqual('123', out.decode('ascii').rstrip())
2368 self.assertEqual('', err.decode('ascii'))
2369
2370#
2371#
2372#
2373
Jesse Noller1b90efb2009-06-30 17:11:52 +00002374testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002375 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002376
Benjamin Petersondfd79492008-06-13 19:13:39 +00002377#
2378#
2379#
2380
2381def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002382 if sys.platform.startswith("linux"):
2383 try:
2384 lock = multiprocessing.RLock()
2385 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002386 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002387
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002388 check_enough_semaphores()
2389
Benjamin Petersondfd79492008-06-13 19:13:39 +00002390 if run is None:
2391 from test.test_support import run_unittest as run
2392
2393 util.get_temp_dir() # creates temp directory for use by all processes
2394
2395 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2396
Jesse Noller146b7ab2008-07-02 16:44:09 +00002397 ProcessesMixin.pool = multiprocessing.Pool(4)
2398 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2399 ManagerMixin.manager.__init__()
2400 ManagerMixin.manager.start()
2401 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002402
2403 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002404 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2405 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002406 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2407 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002408 )
2409
2410 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2411 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002412 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2413 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002414 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002415 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002416 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002417 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2418 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2419 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002420 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002421
Jesse Noller146b7ab2008-07-02 16:44:09 +00002422 ThreadsMixin.pool.terminate()
2423 ProcessesMixin.pool.terminate()
2424 ManagerMixin.pool.terminate()
2425 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002426
Jesse Noller146b7ab2008-07-02 16:44:09 +00002427 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002428
2429def main():
2430 test_main(unittest.TextTestRunner(verbosity=2).run)
2431
2432if __name__ == '__main__':
2433 main()