blob: 0d98a1498c74837ed00fc6fa29dbd8efe18564a1 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalie51c8da2011-09-21 18:48:21 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
85
Antoine Pitroubcb39d42011-08-23 19:46:22 +020086try:
87 MAXFD = os.sysconf("SC_OPEN_MAX")
88except:
89 MAXFD = 256
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000092# Some tests require ctypes
93#
94
95try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000096 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000097except ImportError:
98 Structure = object
99 c_int = c_double = None
100
Charles-François Natali3be00952011-11-22 18:36:39 +0100101
102def check_enough_semaphores():
103 """Check that the system supports enough semaphores to run the test."""
104 # minimum number of semaphores available according to POSIX
105 nsems_min = 256
106 try:
107 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
108 except (AttributeError, ValueError):
109 # sysconf not available or setting not available
110 return
111 if nsems == -1 or nsems >= nsems_min:
112 return
113 raise unittest.SkipTest("The OS doesn't support enough semaphores "
114 "to run the test (required: %d)." % nsems_min)
115
116
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000117#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000118# Creates a wrapper for a function which records the time it takes to finish
119#
120
121class TimingWrapper(object):
122
123 def __init__(self, func):
124 self.func = func
125 self.elapsed = None
126
127 def __call__(self, *args, **kwds):
128 t = time.time()
129 try:
130 return self.func(*args, **kwds)
131 finally:
132 self.elapsed = time.time() - t
133
134#
135# Base class for test cases
136#
137
138class BaseTestCase(object):
139
140 ALLOWED_TYPES = ('processes', 'manager', 'threads')
141
142 def assertTimingAlmostEqual(self, a, b):
143 if CHECK_TIMINGS:
144 self.assertAlmostEqual(a, b, 1)
145
146 def assertReturnsIfImplemented(self, value, func, *args):
147 try:
148 res = func(*args)
149 except NotImplementedError:
150 pass
151 else:
152 return self.assertEqual(value, res)
153
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000154 # For the sanity of Windows users, rather than crashing or freezing in
155 # multiple ways.
156 def __reduce__(self, *args):
157 raise NotImplementedError("shouldn't try to pickle a test case")
158
159 __reduce_ex__ = __reduce__
160
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161#
162# Return the value of a semaphore
163#
164
165def get_value(self):
166 try:
167 return self.get_value()
168 except AttributeError:
169 try:
170 return self._Semaphore__value
171 except AttributeError:
172 try:
173 return self._value
174 except AttributeError:
175 raise NotImplementedError
176
177#
178# Testcases
179#
180
181class _TestProcess(BaseTestCase):
182
183 ALLOWED_TYPES = ('processes', 'threads')
184
185 def test_current(self):
186 if self.TYPE == 'threads':
187 return
188
189 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191
192 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000193 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000194 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000196 self.assertEqual(current.ident, os.getpid())
197 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000199 @classmethod
200 def _test(cls, q, *args, **kwds):
201 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202 q.put(args)
203 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000205 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 q.put(current.pid)
208
209 def test_process(self):
210 q = self.Queue(1)
211 e = self.Event()
212 args = (q, 1, 2)
213 kwargs = {'hello':23, 'bye':2.54}
214 name = 'SomeProcess'
215 p = self.Process(
216 target=self._test, args=args, kwargs=kwargs, name=name
217 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000218 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219 current = self.current_process()
220
221 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000222 self.assertEqual(p.authkey, current.authkey)
223 self.assertEqual(p.is_alive(), False)
224 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000225 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 p.start()
230
Ezio Melottib3aedd42010-11-20 19:04:17 +0000231 self.assertEqual(p.exitcode, None)
232 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000233 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
Ezio Melottib3aedd42010-11-20 19:04:17 +0000235 self.assertEqual(q.get(), args[1:])
236 self.assertEqual(q.get(), kwargs)
237 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000238 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000239 self.assertEqual(q.get(), current.authkey)
240 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241
242 p.join()
243
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.exitcode, 0)
245 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000248 @classmethod
249 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250 time.sleep(1000)
251
252 def test_terminate(self):
253 if self.TYPE == 'threads':
254 return
255
256 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 p.start()
259
260 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000261 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000262 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
264 p.terminate()
265
266 join = TimingWrapper(p.join)
267 self.assertEqual(join(), None)
268 self.assertTimingAlmostEqual(join.elapsed, 0.0)
269
270 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000271 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272
273 p.join()
274
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 # XXX sometimes get p.exitcode == 0 on Windows ...
276 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
278 def test_cpu_count(self):
279 try:
280 cpus = multiprocessing.cpu_count()
281 except NotImplementedError:
282 cpus = 1
283 self.assertTrue(type(cpus) is int)
284 self.assertTrue(cpus >= 1)
285
286 def test_active_children(self):
287 self.assertEqual(type(self.active_children()), list)
288
289 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000290 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291
Jesus Cea94f964f2011-09-09 20:26:57 +0200292 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000294 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295
296 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000297 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000299 @classmethod
300 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 from multiprocessing import forking
302 wconn.send(id)
303 if len(id) < 2:
304 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000305 p = cls.Process(
306 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307 )
308 p.start()
309 p.join()
310
311 def test_recursion(self):
312 rconn, wconn = self.Pipe(duplex=False)
313 self._test_recursion(wconn, [])
314
315 time.sleep(DELTA)
316 result = []
317 while rconn.poll():
318 result.append(rconn.recv())
319
320 expected = [
321 [],
322 [0],
323 [0, 0],
324 [0, 1],
325 [1],
326 [1, 0],
327 [1, 1]
328 ]
329 self.assertEqual(result, expected)
330
331#
332#
333#
334
335class _UpperCaser(multiprocessing.Process):
336
337 def __init__(self):
338 multiprocessing.Process.__init__(self)
339 self.child_conn, self.parent_conn = multiprocessing.Pipe()
340
341 def run(self):
342 self.parent_conn.close()
343 for s in iter(self.child_conn.recv, None):
344 self.child_conn.send(s.upper())
345 self.child_conn.close()
346
347 def submit(self, s):
348 assert type(s) is str
349 self.parent_conn.send(s)
350 return self.parent_conn.recv()
351
352 def stop(self):
353 self.parent_conn.send(None)
354 self.parent_conn.close()
355 self.child_conn.close()
356
357class _TestSubclassingProcess(BaseTestCase):
358
359 ALLOWED_TYPES = ('processes',)
360
361 def test_subclassing(self):
362 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200363 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000364 uppercaser.start()
365 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
366 self.assertEqual(uppercaser.submit('world'), 'WORLD')
367 uppercaser.stop()
368 uppercaser.join()
369
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100370 def test_stderr_flush(self):
371 # sys.stderr is flushed at process shutdown (issue #13812)
372 if self.TYPE == "threads":
373 return
374
375 testfn = test.support.TESTFN
376 self.addCleanup(test.support.unlink, testfn)
377 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
378 proc.start()
379 proc.join()
380 with open(testfn, 'r') as f:
381 err = f.read()
382 # The whole traceback was printed
383 self.assertIn("ZeroDivisionError", err)
384 self.assertIn("test_multiprocessing.py", err)
385 self.assertIn("1/0 # MARKER", err)
386
387 @classmethod
388 def _test_stderr_flush(cls, testfn):
389 sys.stderr = open(testfn, 'w')
390 1/0 # MARKER
391
392
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393#
394#
395#
396
397def queue_empty(q):
398 if hasattr(q, 'empty'):
399 return q.empty()
400 else:
401 return q.qsize() == 0
402
403def queue_full(q, maxsize):
404 if hasattr(q, 'full'):
405 return q.full()
406 else:
407 return q.qsize() == maxsize
408
409
410class _TestQueue(BaseTestCase):
411
412
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000413 @classmethod
414 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000415 child_can_start.wait()
416 for i in range(6):
417 queue.get()
418 parent_can_continue.set()
419
420 def test_put(self):
421 MAXSIZE = 6
422 queue = self.Queue(maxsize=MAXSIZE)
423 child_can_start = self.Event()
424 parent_can_continue = self.Event()
425
426 proc = self.Process(
427 target=self._test_put,
428 args=(queue, child_can_start, parent_can_continue)
429 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000430 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000431 proc.start()
432
433 self.assertEqual(queue_empty(queue), True)
434 self.assertEqual(queue_full(queue, MAXSIZE), False)
435
436 queue.put(1)
437 queue.put(2, True)
438 queue.put(3, True, None)
439 queue.put(4, False)
440 queue.put(5, False, None)
441 queue.put_nowait(6)
442
443 # the values may be in buffer but not yet in pipe so sleep a bit
444 time.sleep(DELTA)
445
446 self.assertEqual(queue_empty(queue), False)
447 self.assertEqual(queue_full(queue, MAXSIZE), True)
448
449 put = TimingWrapper(queue.put)
450 put_nowait = TimingWrapper(queue.put_nowait)
451
452 self.assertRaises(pyqueue.Full, put, 7, False)
453 self.assertTimingAlmostEqual(put.elapsed, 0)
454
455 self.assertRaises(pyqueue.Full, put, 7, False, None)
456 self.assertTimingAlmostEqual(put.elapsed, 0)
457
458 self.assertRaises(pyqueue.Full, put_nowait, 7)
459 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
460
461 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
462 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
463
464 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
465 self.assertTimingAlmostEqual(put.elapsed, 0)
466
467 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
468 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
469
470 child_can_start.set()
471 parent_can_continue.wait()
472
473 self.assertEqual(queue_empty(queue), True)
474 self.assertEqual(queue_full(queue, MAXSIZE), False)
475
476 proc.join()
477
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000478 @classmethod
479 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000480 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000481 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000482 queue.put(2)
483 queue.put(3)
484 queue.put(4)
485 queue.put(5)
486 parent_can_continue.set()
487
488 def test_get(self):
489 queue = self.Queue()
490 child_can_start = self.Event()
491 parent_can_continue = self.Event()
492
493 proc = self.Process(
494 target=self._test_get,
495 args=(queue, child_can_start, parent_can_continue)
496 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000497 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000498 proc.start()
499
500 self.assertEqual(queue_empty(queue), True)
501
502 child_can_start.set()
503 parent_can_continue.wait()
504
505 time.sleep(DELTA)
506 self.assertEqual(queue_empty(queue), False)
507
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000508 # Hangs unexpectedly, remove for now
509 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000510 self.assertEqual(queue.get(True, None), 2)
511 self.assertEqual(queue.get(True), 3)
512 self.assertEqual(queue.get(timeout=1), 4)
513 self.assertEqual(queue.get_nowait(), 5)
514
515 self.assertEqual(queue_empty(queue), True)
516
517 get = TimingWrapper(queue.get)
518 get_nowait = TimingWrapper(queue.get_nowait)
519
520 self.assertRaises(pyqueue.Empty, get, False)
521 self.assertTimingAlmostEqual(get.elapsed, 0)
522
523 self.assertRaises(pyqueue.Empty, get, False, None)
524 self.assertTimingAlmostEqual(get.elapsed, 0)
525
526 self.assertRaises(pyqueue.Empty, get_nowait)
527 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
528
529 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
530 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
531
532 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
533 self.assertTimingAlmostEqual(get.elapsed, 0)
534
535 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
536 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
537
538 proc.join()
539
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000540 @classmethod
541 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000542 for i in range(10, 20):
543 queue.put(i)
544 # note that at this point the items may only be buffered, so the
545 # process cannot shutdown until the feeder thread has finished
546 # pushing items onto the pipe.
547
548 def test_fork(self):
549 # Old versions of Queue would fail to create a new feeder
550 # thread for a forked process if the original process had its
551 # own feeder thread. This test checks that this no longer
552 # happens.
553
554 queue = self.Queue()
555
556 # put items on queue so that main process starts a feeder thread
557 for i in range(10):
558 queue.put(i)
559
560 # wait to make sure thread starts before we fork a new process
561 time.sleep(DELTA)
562
563 # fork process
564 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200565 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000566 p.start()
567
568 # check that all expected items are in the queue
569 for i in range(20):
570 self.assertEqual(queue.get(), i)
571 self.assertRaises(pyqueue.Empty, queue.get, False)
572
573 p.join()
574
575 def test_qsize(self):
576 q = self.Queue()
577 try:
578 self.assertEqual(q.qsize(), 0)
579 except NotImplementedError:
580 return
581 q.put(1)
582 self.assertEqual(q.qsize(), 1)
583 q.put(5)
584 self.assertEqual(q.qsize(), 2)
585 q.get()
586 self.assertEqual(q.qsize(), 1)
587 q.get()
588 self.assertEqual(q.qsize(), 0)
589
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000590 @classmethod
591 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 for obj in iter(q.get, None):
593 time.sleep(DELTA)
594 q.task_done()
595
596 def test_task_done(self):
597 queue = self.JoinableQueue()
598
599 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000600 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000601
602 workers = [self.Process(target=self._test_task_done, args=(queue,))
603 for i in range(4)]
604
605 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200606 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000607 p.start()
608
609 for i in range(10):
610 queue.put(i)
611
612 queue.join()
613
614 for p in workers:
615 queue.put(None)
616
617 for p in workers:
618 p.join()
619
620#
621#
622#
623
624class _TestLock(BaseTestCase):
625
626 def test_lock(self):
627 lock = self.Lock()
628 self.assertEqual(lock.acquire(), True)
629 self.assertEqual(lock.acquire(False), False)
630 self.assertEqual(lock.release(), None)
631 self.assertRaises((ValueError, threading.ThreadError), lock.release)
632
633 def test_rlock(self):
634 lock = self.RLock()
635 self.assertEqual(lock.acquire(), True)
636 self.assertEqual(lock.acquire(), True)
637 self.assertEqual(lock.acquire(), True)
638 self.assertEqual(lock.release(), None)
639 self.assertEqual(lock.release(), None)
640 self.assertEqual(lock.release(), None)
641 self.assertRaises((AssertionError, RuntimeError), lock.release)
642
Jesse Nollerf8d00852009-03-31 03:25:07 +0000643 def test_lock_context(self):
644 with self.Lock():
645 pass
646
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647
648class _TestSemaphore(BaseTestCase):
649
650 def _test_semaphore(self, sem):
651 self.assertReturnsIfImplemented(2, get_value, sem)
652 self.assertEqual(sem.acquire(), True)
653 self.assertReturnsIfImplemented(1, get_value, sem)
654 self.assertEqual(sem.acquire(), True)
655 self.assertReturnsIfImplemented(0, get_value, sem)
656 self.assertEqual(sem.acquire(False), False)
657 self.assertReturnsIfImplemented(0, get_value, sem)
658 self.assertEqual(sem.release(), None)
659 self.assertReturnsIfImplemented(1, get_value, sem)
660 self.assertEqual(sem.release(), None)
661 self.assertReturnsIfImplemented(2, get_value, sem)
662
663 def test_semaphore(self):
664 sem = self.Semaphore(2)
665 self._test_semaphore(sem)
666 self.assertEqual(sem.release(), None)
667 self.assertReturnsIfImplemented(3, get_value, sem)
668 self.assertEqual(sem.release(), None)
669 self.assertReturnsIfImplemented(4, get_value, sem)
670
671 def test_bounded_semaphore(self):
672 sem = self.BoundedSemaphore(2)
673 self._test_semaphore(sem)
674 # Currently fails on OS/X
675 #if HAVE_GETVALUE:
676 # self.assertRaises(ValueError, sem.release)
677 # self.assertReturnsIfImplemented(2, get_value, sem)
678
679 def test_timeout(self):
680 if self.TYPE != 'processes':
681 return
682
683 sem = self.Semaphore(0)
684 acquire = TimingWrapper(sem.acquire)
685
686 self.assertEqual(acquire(False), False)
687 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
688
689 self.assertEqual(acquire(False, None), False)
690 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
691
692 self.assertEqual(acquire(False, TIMEOUT1), False)
693 self.assertTimingAlmostEqual(acquire.elapsed, 0)
694
695 self.assertEqual(acquire(True, TIMEOUT2), False)
696 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
697
698 self.assertEqual(acquire(timeout=TIMEOUT3), False)
699 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
700
701
702class _TestCondition(BaseTestCase):
703
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000704 @classmethod
705 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 cond.acquire()
707 sleeping.release()
708 cond.wait(timeout)
709 woken.release()
710 cond.release()
711
712 def check_invariant(self, cond):
713 # this is only supposed to succeed when there are no sleepers
714 if self.TYPE == 'processes':
715 try:
716 sleepers = (cond._sleeping_count.get_value() -
717 cond._woken_count.get_value())
718 self.assertEqual(sleepers, 0)
719 self.assertEqual(cond._wait_semaphore.get_value(), 0)
720 except NotImplementedError:
721 pass
722
723 def test_notify(self):
724 cond = self.Condition()
725 sleeping = self.Semaphore(0)
726 woken = self.Semaphore(0)
727
728 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000729 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000730 p.start()
731
732 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000733 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000734 p.start()
735
736 # wait for both children to start sleeping
737 sleeping.acquire()
738 sleeping.acquire()
739
740 # check no process/thread has woken up
741 time.sleep(DELTA)
742 self.assertReturnsIfImplemented(0, get_value, woken)
743
744 # wake up one process/thread
745 cond.acquire()
746 cond.notify()
747 cond.release()
748
749 # check one process/thread has woken up
750 time.sleep(DELTA)
751 self.assertReturnsIfImplemented(1, get_value, woken)
752
753 # wake up another
754 cond.acquire()
755 cond.notify()
756 cond.release()
757
758 # check other has woken up
759 time.sleep(DELTA)
760 self.assertReturnsIfImplemented(2, get_value, woken)
761
762 # check state is not mucked up
763 self.check_invariant(cond)
764 p.join()
765
766 def test_notify_all(self):
767 cond = self.Condition()
768 sleeping = self.Semaphore(0)
769 woken = self.Semaphore(0)
770
771 # start some threads/processes which will timeout
772 for i in range(3):
773 p = self.Process(target=self.f,
774 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000775 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 p.start()
777
778 t = threading.Thread(target=self.f,
779 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000780 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000781 t.start()
782
783 # wait for them all to sleep
784 for i in range(6):
785 sleeping.acquire()
786
787 # check they have all timed out
788 for i in range(6):
789 woken.acquire()
790 self.assertReturnsIfImplemented(0, get_value, woken)
791
792 # check state is not mucked up
793 self.check_invariant(cond)
794
795 # start some more threads/processes
796 for i in range(3):
797 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000798 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 p.start()
800
801 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000802 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000803 t.start()
804
805 # wait for them to all sleep
806 for i in range(6):
807 sleeping.acquire()
808
809 # check no process/thread has woken up
810 time.sleep(DELTA)
811 self.assertReturnsIfImplemented(0, get_value, woken)
812
813 # wake them all up
814 cond.acquire()
815 cond.notify_all()
816 cond.release()
817
818 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200819 for i in range(10):
820 try:
821 if get_value(woken) == 6:
822 break
823 except NotImplementedError:
824 break
825 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 self.assertReturnsIfImplemented(6, get_value, woken)
827
828 # check state is not mucked up
829 self.check_invariant(cond)
830
831 def test_timeout(self):
832 cond = self.Condition()
833 wait = TimingWrapper(cond.wait)
834 cond.acquire()
835 res = wait(TIMEOUT1)
836 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000837 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
839
840
841class _TestEvent(BaseTestCase):
842
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000843 @classmethod
844 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000845 time.sleep(TIMEOUT2)
846 event.set()
847
848 def test_event(self):
849 event = self.Event()
850 wait = TimingWrapper(event.wait)
851
Ezio Melotti13925002011-03-16 11:05:33 +0200852 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000854 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000855
Benjamin Peterson965ce872009-04-05 21:24:58 +0000856 # Removed, threading.Event.wait() will return the value of the __flag
857 # instead of None. API Shear with the semaphore backed mp.Event
858 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000859 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000860 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
862
863 event.set()
864
865 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000866 self.assertEqual(event.is_set(), True)
867 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000868 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000869 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
871 # self.assertEqual(event.is_set(), True)
872
873 event.clear()
874
875 #self.assertEqual(event.is_set(), False)
876
Jesus Cea94f964f2011-09-09 20:26:57 +0200877 p = self.Process(target=self._test_event, args=(event,))
878 p.daemon = True
879 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000880 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881
882#
883#
884#
885
886class _TestValue(BaseTestCase):
887
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000888 ALLOWED_TYPES = ('processes',)
889
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 codes_values = [
891 ('i', 4343, 24234),
892 ('d', 3.625, -4.25),
893 ('h', -232, 234),
894 ('c', latin('x'), latin('y'))
895 ]
896
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000897 def setUp(self):
898 if not HAS_SHAREDCTYPES:
899 self.skipTest("requires multiprocessing.sharedctypes")
900
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000901 @classmethod
902 def _test(cls, values):
903 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 sv.value = cv[2]
905
906
907 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 if raw:
909 values = [self.RawValue(code, value)
910 for code, value, _ in self.codes_values]
911 else:
912 values = [self.Value(code, value)
913 for code, value, _ in self.codes_values]
914
915 for sv, cv in zip(values, self.codes_values):
916 self.assertEqual(sv.value, cv[1])
917
918 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200919 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 proc.start()
921 proc.join()
922
923 for sv, cv in zip(values, self.codes_values):
924 self.assertEqual(sv.value, cv[2])
925
926 def test_rawvalue(self):
927 self.test_value(raw=True)
928
929 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000930 val1 = self.Value('i', 5)
931 lock1 = val1.get_lock()
932 obj1 = val1.get_obj()
933
934 val2 = self.Value('i', 5, lock=None)
935 lock2 = val2.get_lock()
936 obj2 = val2.get_obj()
937
938 lock = self.Lock()
939 val3 = self.Value('i', 5, lock=lock)
940 lock3 = val3.get_lock()
941 obj3 = val3.get_obj()
942 self.assertEqual(lock, lock3)
943
Jesse Nollerb0516a62009-01-18 03:11:38 +0000944 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000945 self.assertFalse(hasattr(arr4, 'get_lock'))
946 self.assertFalse(hasattr(arr4, 'get_obj'))
947
Jesse Nollerb0516a62009-01-18 03:11:38 +0000948 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
949
950 arr5 = self.RawValue('i', 5)
951 self.assertFalse(hasattr(arr5, 'get_lock'))
952 self.assertFalse(hasattr(arr5, 'get_obj'))
953
Benjamin Petersone711caf2008-06-11 16:44:04 +0000954
955class _TestArray(BaseTestCase):
956
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000957 ALLOWED_TYPES = ('processes',)
958
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000959 @classmethod
960 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961 for i in range(1, len(seq)):
962 seq[i] += seq[i-1]
963
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000964 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000965 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000966 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
967 if raw:
968 arr = self.RawArray('i', seq)
969 else:
970 arr = self.Array('i', seq)
971
972 self.assertEqual(len(arr), len(seq))
973 self.assertEqual(arr[3], seq[3])
974 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
975
976 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
977
978 self.assertEqual(list(arr[:]), seq)
979
980 self.f(seq)
981
982 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200983 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000984 p.start()
985 p.join()
986
987 self.assertEqual(list(arr[:]), seq)
988
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000989 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000990 def test_array_from_size(self):
991 size = 10
992 # Test for zeroing (see issue #11675).
993 # The repetition below strengthens the test by increasing the chances
994 # of previously allocated non-zero memory being used for the new array
995 # on the 2nd and 3rd loops.
996 for _ in range(3):
997 arr = self.Array('i', size)
998 self.assertEqual(len(arr), size)
999 self.assertEqual(list(arr), [0] * size)
1000 arr[:] = range(10)
1001 self.assertEqual(list(arr), list(range(10)))
1002 del arr
1003
1004 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001005 def test_rawarray(self):
1006 self.test_array(raw=True)
1007
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001008 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001009 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010 arr1 = self.Array('i', list(range(10)))
1011 lock1 = arr1.get_lock()
1012 obj1 = arr1.get_obj()
1013
1014 arr2 = self.Array('i', list(range(10)), lock=None)
1015 lock2 = arr2.get_lock()
1016 obj2 = arr2.get_obj()
1017
1018 lock = self.Lock()
1019 arr3 = self.Array('i', list(range(10)), lock=lock)
1020 lock3 = arr3.get_lock()
1021 obj3 = arr3.get_obj()
1022 self.assertEqual(lock, lock3)
1023
Jesse Nollerb0516a62009-01-18 03:11:38 +00001024 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025 self.assertFalse(hasattr(arr4, 'get_lock'))
1026 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001027 self.assertRaises(AttributeError,
1028 self.Array, 'i', range(10), lock='notalock')
1029
1030 arr5 = self.RawArray('i', range(10))
1031 self.assertFalse(hasattr(arr5, 'get_lock'))
1032 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033
1034#
1035#
1036#
1037
1038class _TestContainers(BaseTestCase):
1039
1040 ALLOWED_TYPES = ('manager',)
1041
1042 def test_list(self):
1043 a = self.list(list(range(10)))
1044 self.assertEqual(a[:], list(range(10)))
1045
1046 b = self.list()
1047 self.assertEqual(b[:], [])
1048
1049 b.extend(list(range(5)))
1050 self.assertEqual(b[:], list(range(5)))
1051
1052 self.assertEqual(b[2], 2)
1053 self.assertEqual(b[2:10], [2,3,4])
1054
1055 b *= 2
1056 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1057
1058 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1059
1060 self.assertEqual(a[:], list(range(10)))
1061
1062 d = [a, b]
1063 e = self.list(d)
1064 self.assertEqual(
1065 e[:],
1066 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1067 )
1068
1069 f = self.list([a])
1070 a.append('hello')
1071 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1072
1073 def test_dict(self):
1074 d = self.dict()
1075 indices = list(range(65, 70))
1076 for i in indices:
1077 d[i] = chr(i)
1078 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1079 self.assertEqual(sorted(d.keys()), indices)
1080 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1081 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1082
1083 def test_namespace(self):
1084 n = self.Namespace()
1085 n.name = 'Bob'
1086 n.job = 'Builder'
1087 n._hidden = 'hidden'
1088 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1089 del n.job
1090 self.assertEqual(str(n), "Namespace(name='Bob')")
1091 self.assertTrue(hasattr(n, 'name'))
1092 self.assertTrue(not hasattr(n, 'job'))
1093
1094#
1095#
1096#
1097
1098def sqr(x, wait=0.0):
1099 time.sleep(wait)
1100 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001101
Benjamin Petersone711caf2008-06-11 16:44:04 +00001102class _TestPool(BaseTestCase):
1103
1104 def test_apply(self):
1105 papply = self.pool.apply
1106 self.assertEqual(papply(sqr, (5,)), sqr(5))
1107 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1108
1109 def test_map(self):
1110 pmap = self.pool.map
1111 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1112 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1113 list(map(sqr, list(range(100)))))
1114
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001115 def test_map_chunksize(self):
1116 try:
1117 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1118 except multiprocessing.TimeoutError:
1119 self.fail("pool.map_async with chunksize stalled on null list")
1120
Benjamin Petersone711caf2008-06-11 16:44:04 +00001121 def test_async(self):
1122 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1123 get = TimingWrapper(res.get)
1124 self.assertEqual(get(), 49)
1125 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1126
1127 def test_async_timeout(self):
1128 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1129 get = TimingWrapper(res.get)
1130 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1131 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1132
1133 def test_imap(self):
1134 it = self.pool.imap(sqr, list(range(10)))
1135 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1136
1137 it = self.pool.imap(sqr, list(range(10)))
1138 for i in range(10):
1139 self.assertEqual(next(it), i*i)
1140 self.assertRaises(StopIteration, it.__next__)
1141
1142 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1143 for i in range(1000):
1144 self.assertEqual(next(it), i*i)
1145 self.assertRaises(StopIteration, it.__next__)
1146
1147 def test_imap_unordered(self):
1148 it = self.pool.imap_unordered(sqr, list(range(1000)))
1149 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1150
1151 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1152 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1153
1154 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001155 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1156 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1157
Benjamin Petersone711caf2008-06-11 16:44:04 +00001158 p = multiprocessing.Pool(3)
1159 self.assertEqual(3, len(p._pool))
1160 p.close()
1161 p.join()
1162
1163 def test_terminate(self):
1164 if self.TYPE == 'manager':
1165 # On Unix a forked process increfs each shared object to
1166 # which its parent process held a reference. If the
1167 # forked process gets terminated then there is likely to
1168 # be a reference leak. So to prevent
1169 # _TestZZZNumberOfObjects from failing we skip this test
1170 # when using a manager.
1171 return
1172
1173 result = self.pool.map_async(
1174 time.sleep, [0.1 for i in range(10000)], chunksize=1
1175 )
1176 self.pool.terminate()
1177 join = TimingWrapper(self.pool.join)
1178 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001179 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001180
Richard Oudkerke41682b2012-06-06 19:04:57 +01001181 def test_empty_iterable(self):
1182 # See Issue 12157
1183 p = self.Pool(1)
1184
1185 self.assertEqual(p.map(sqr, []), [])
1186 self.assertEqual(list(p.imap(sqr, [])), [])
1187 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1188 self.assertEqual(p.map_async(sqr, []).get(), [])
1189
1190 p.close()
1191 p.join()
1192
Ask Solem2afcbf22010-11-09 20:55:52 +00001193def raising():
1194 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001195
Ask Solem2afcbf22010-11-09 20:55:52 +00001196def unpickleable_result():
1197 return lambda: 42
1198
1199class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001200 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001201
1202 def test_async_error_callback(self):
1203 p = multiprocessing.Pool(2)
1204
1205 scratchpad = [None]
1206 def errback(exc):
1207 scratchpad[0] = exc
1208
1209 res = p.apply_async(raising, error_callback=errback)
1210 self.assertRaises(KeyError, res.get)
1211 self.assertTrue(scratchpad[0])
1212 self.assertIsInstance(scratchpad[0], KeyError)
1213
1214 p.close()
1215 p.join()
1216
1217 def test_unpickleable_result(self):
1218 from multiprocessing.pool import MaybeEncodingError
1219 p = multiprocessing.Pool(2)
1220
1221 # Make sure we don't lose pool processes because of encoding errors.
1222 for iteration in range(20):
1223
1224 scratchpad = [None]
1225 def errback(exc):
1226 scratchpad[0] = exc
1227
1228 res = p.apply_async(unpickleable_result, error_callback=errback)
1229 self.assertRaises(MaybeEncodingError, res.get)
1230 wrapped = scratchpad[0]
1231 self.assertTrue(wrapped)
1232 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1233 self.assertIsNotNone(wrapped.exc)
1234 self.assertIsNotNone(wrapped.value)
1235
1236 p.close()
1237 p.join()
1238
1239class _TestPoolWorkerLifetime(BaseTestCase):
1240 ALLOWED_TYPES = ('processes', )
1241
Jesse Noller1f0b6582010-01-27 03:36:01 +00001242 def test_pool_worker_lifetime(self):
1243 p = multiprocessing.Pool(3, maxtasksperchild=10)
1244 self.assertEqual(3, len(p._pool))
1245 origworkerpids = [w.pid for w in p._pool]
1246 # Run many tasks so each worker gets replaced (hopefully)
1247 results = []
1248 for i in range(100):
1249 results.append(p.apply_async(sqr, (i, )))
1250 # Fetch the results and verify we got the right answers,
1251 # also ensuring all the tasks have completed.
1252 for (j, res) in enumerate(results):
1253 self.assertEqual(res.get(), sqr(j))
1254 # Refill the pool
1255 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001256 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001257 # (countdown * DELTA = 5 seconds max startup process time)
1258 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001259 while countdown and not all(w.is_alive() for w in p._pool):
1260 countdown -= 1
1261 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001262 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001263 # All pids should be assigned. See issue #7805.
1264 self.assertNotIn(None, origworkerpids)
1265 self.assertNotIn(None, finalworkerpids)
1266 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001267 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1268 p.close()
1269 p.join()
1270
Charles-François Natalif8859e12011-10-24 18:45:29 +02001271 def test_pool_worker_lifetime_early_close(self):
1272 # Issue #10332: closing a pool whose workers have limited lifetimes
1273 # before all the tasks completed would make join() hang.
1274 p = multiprocessing.Pool(3, maxtasksperchild=1)
1275 results = []
1276 for i in range(6):
1277 results.append(p.apply_async(sqr, (i, 0.3)))
1278 p.close()
1279 p.join()
1280 # check the results
1281 for (j, res) in enumerate(results):
1282 self.assertEqual(res.get(), sqr(j))
1283
1284
Benjamin Petersone711caf2008-06-11 16:44:04 +00001285#
1286# Test that manager has expected number of shared objects left
1287#
1288
1289class _TestZZZNumberOfObjects(BaseTestCase):
1290 # Because test cases are sorted alphabetically, this one will get
1291 # run after all the other tests for the manager. It tests that
1292 # there have been no "reference leaks" for the manager's shared
1293 # objects. Note the comment in _TestPool.test_terminate().
1294 ALLOWED_TYPES = ('manager',)
1295
1296 def test_number_of_objects(self):
1297 EXPECTED_NUMBER = 1 # the pool object is still alive
1298 multiprocessing.active_children() # discard dead process objs
1299 gc.collect() # do garbage collection
1300 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001301 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001302 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001303 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001304 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001305
1306 self.assertEqual(refs, EXPECTED_NUMBER)
1307
1308#
1309# Test of creating a customized manager class
1310#
1311
1312from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1313
1314class FooBar(object):
1315 def f(self):
1316 return 'f()'
1317 def g(self):
1318 raise ValueError
1319 def _h(self):
1320 return '_h()'
1321
1322def baz():
1323 for i in range(10):
1324 yield i*i
1325
1326class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001327 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001328 def __iter__(self):
1329 return self
1330 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001331 return self._callmethod('__next__')
1332
1333class MyManager(BaseManager):
1334 pass
1335
1336MyManager.register('Foo', callable=FooBar)
1337MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1338MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1339
1340
1341class _TestMyManager(BaseTestCase):
1342
1343 ALLOWED_TYPES = ('manager',)
1344
1345 def test_mymanager(self):
1346 manager = MyManager()
1347 manager.start()
1348
1349 foo = manager.Foo()
1350 bar = manager.Bar()
1351 baz = manager.baz()
1352
1353 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1354 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1355
1356 self.assertEqual(foo_methods, ['f', 'g'])
1357 self.assertEqual(bar_methods, ['f', '_h'])
1358
1359 self.assertEqual(foo.f(), 'f()')
1360 self.assertRaises(ValueError, foo.g)
1361 self.assertEqual(foo._callmethod('f'), 'f()')
1362 self.assertRaises(RemoteError, foo._callmethod, '_h')
1363
1364 self.assertEqual(bar.f(), 'f()')
1365 self.assertEqual(bar._h(), '_h()')
1366 self.assertEqual(bar._callmethod('f'), 'f()')
1367 self.assertEqual(bar._callmethod('_h'), '_h()')
1368
1369 self.assertEqual(list(baz), [i*i for i in range(10)])
1370
1371 manager.shutdown()
1372
1373#
1374# Test of connecting to a remote server and using xmlrpclib for serialization
1375#
1376
1377_queue = pyqueue.Queue()
1378def get_queue():
1379 return _queue
1380
1381class QueueManager(BaseManager):
1382 '''manager class used by server process'''
1383QueueManager.register('get_queue', callable=get_queue)
1384
1385class QueueManager2(BaseManager):
1386 '''manager class which specifies the same interface as QueueManager'''
1387QueueManager2.register('get_queue')
1388
1389
1390SERIALIZER = 'xmlrpclib'
1391
1392class _TestRemoteManager(BaseTestCase):
1393
1394 ALLOWED_TYPES = ('manager',)
1395
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001396 @classmethod
1397 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001398 manager = QueueManager2(
1399 address=address, authkey=authkey, serializer=SERIALIZER
1400 )
1401 manager.connect()
1402 queue = manager.get_queue()
1403 queue.put(('hello world', None, True, 2.25))
1404
1405 def test_remote(self):
1406 authkey = os.urandom(32)
1407
1408 manager = QueueManager(
1409 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1410 )
1411 manager.start()
1412
1413 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001414 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001415 p.start()
1416
1417 manager2 = QueueManager2(
1418 address=manager.address, authkey=authkey, serializer=SERIALIZER
1419 )
1420 manager2.connect()
1421 queue = manager2.get_queue()
1422
1423 # Note that xmlrpclib will deserialize object as a list not a tuple
1424 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1425
1426 # Because we are using xmlrpclib for serialization instead of
1427 # pickle this will cause a serialization error.
1428 self.assertRaises(Exception, queue.put, time.sleep)
1429
1430 # Make queue finalizer run before the server is stopped
1431 del queue
1432 manager.shutdown()
1433
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001434class _TestManagerRestart(BaseTestCase):
1435
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001436 @classmethod
1437 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001438 manager = QueueManager(
1439 address=address, authkey=authkey, serializer=SERIALIZER)
1440 manager.connect()
1441 queue = manager.get_queue()
1442 queue.put('hello world')
1443
1444 def test_rapid_restart(self):
1445 authkey = os.urandom(32)
1446 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001447 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001448 srvr = manager.get_server()
1449 addr = srvr.address
1450 # Close the connection.Listener socket which gets opened as a part
1451 # of manager.get_server(). It's not needed for the test.
1452 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001453 manager.start()
1454
1455 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001456 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001457 p.start()
1458 queue = manager.get_queue()
1459 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001460 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001461 manager.shutdown()
1462 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001463 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001464 try:
1465 manager.start()
1466 except IOError as e:
1467 if e.errno != errno.EADDRINUSE:
1468 raise
1469 # Retry after some time, in case the old socket was lingering
1470 # (sporadic failure on buildbots)
1471 time.sleep(1.0)
1472 manager = QueueManager(
1473 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001474 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001475
Benjamin Petersone711caf2008-06-11 16:44:04 +00001476#
1477#
1478#
1479
1480SENTINEL = latin('')
1481
1482class _TestConnection(BaseTestCase):
1483
1484 ALLOWED_TYPES = ('processes', 'threads')
1485
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001486 @classmethod
1487 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001488 for msg in iter(conn.recv_bytes, SENTINEL):
1489 conn.send_bytes(msg)
1490 conn.close()
1491
1492 def test_connection(self):
1493 conn, child_conn = self.Pipe()
1494
1495 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001496 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001497 p.start()
1498
1499 seq = [1, 2.25, None]
1500 msg = latin('hello world')
1501 longmsg = msg * 10
1502 arr = array.array('i', list(range(4)))
1503
1504 if self.TYPE == 'processes':
1505 self.assertEqual(type(conn.fileno()), int)
1506
1507 self.assertEqual(conn.send(seq), None)
1508 self.assertEqual(conn.recv(), seq)
1509
1510 self.assertEqual(conn.send_bytes(msg), None)
1511 self.assertEqual(conn.recv_bytes(), msg)
1512
1513 if self.TYPE == 'processes':
1514 buffer = array.array('i', [0]*10)
1515 expected = list(arr) + [0] * (10 - len(arr))
1516 self.assertEqual(conn.send_bytes(arr), None)
1517 self.assertEqual(conn.recv_bytes_into(buffer),
1518 len(arr) * buffer.itemsize)
1519 self.assertEqual(list(buffer), expected)
1520
1521 buffer = array.array('i', [0]*10)
1522 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1523 self.assertEqual(conn.send_bytes(arr), None)
1524 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1525 len(arr) * buffer.itemsize)
1526 self.assertEqual(list(buffer), expected)
1527
1528 buffer = bytearray(latin(' ' * 40))
1529 self.assertEqual(conn.send_bytes(longmsg), None)
1530 try:
1531 res = conn.recv_bytes_into(buffer)
1532 except multiprocessing.BufferTooShort as e:
1533 self.assertEqual(e.args, (longmsg,))
1534 else:
1535 self.fail('expected BufferTooShort, got %s' % res)
1536
1537 poll = TimingWrapper(conn.poll)
1538
1539 self.assertEqual(poll(), False)
1540 self.assertTimingAlmostEqual(poll.elapsed, 0)
1541
1542 self.assertEqual(poll(TIMEOUT1), False)
1543 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1544
1545 conn.send(None)
1546
1547 self.assertEqual(poll(TIMEOUT1), True)
1548 self.assertTimingAlmostEqual(poll.elapsed, 0)
1549
1550 self.assertEqual(conn.recv(), None)
1551
1552 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1553 conn.send_bytes(really_big_msg)
1554 self.assertEqual(conn.recv_bytes(), really_big_msg)
1555
1556 conn.send_bytes(SENTINEL) # tell child to quit
1557 child_conn.close()
1558
1559 if self.TYPE == 'processes':
1560 self.assertEqual(conn.readable, True)
1561 self.assertEqual(conn.writable, True)
1562 self.assertRaises(EOFError, conn.recv)
1563 self.assertRaises(EOFError, conn.recv_bytes)
1564
1565 p.join()
1566
1567 def test_duplex_false(self):
1568 reader, writer = self.Pipe(duplex=False)
1569 self.assertEqual(writer.send(1), None)
1570 self.assertEqual(reader.recv(), 1)
1571 if self.TYPE == 'processes':
1572 self.assertEqual(reader.readable, True)
1573 self.assertEqual(reader.writable, False)
1574 self.assertEqual(writer.readable, False)
1575 self.assertEqual(writer.writable, True)
1576 self.assertRaises(IOError, reader.send, 2)
1577 self.assertRaises(IOError, writer.recv)
1578 self.assertRaises(IOError, writer.poll)
1579
1580 def test_spawn_close(self):
1581 # We test that a pipe connection can be closed by parent
1582 # process immediately after child is spawned. On Windows this
1583 # would have sometimes failed on old versions because
1584 # child_conn would be closed before the child got a chance to
1585 # duplicate it.
1586 conn, child_conn = self.Pipe()
1587
1588 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001589 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001590 p.start()
1591 child_conn.close() # this might complete before child initializes
1592
1593 msg = latin('hello')
1594 conn.send_bytes(msg)
1595 self.assertEqual(conn.recv_bytes(), msg)
1596
1597 conn.send_bytes(SENTINEL)
1598 conn.close()
1599 p.join()
1600
1601 def test_sendbytes(self):
1602 if self.TYPE != 'processes':
1603 return
1604
1605 msg = latin('abcdefghijklmnopqrstuvwxyz')
1606 a, b = self.Pipe()
1607
1608 a.send_bytes(msg)
1609 self.assertEqual(b.recv_bytes(), msg)
1610
1611 a.send_bytes(msg, 5)
1612 self.assertEqual(b.recv_bytes(), msg[5:])
1613
1614 a.send_bytes(msg, 7, 8)
1615 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1616
1617 a.send_bytes(msg, 26)
1618 self.assertEqual(b.recv_bytes(), latin(''))
1619
1620 a.send_bytes(msg, 26, 0)
1621 self.assertEqual(b.recv_bytes(), latin(''))
1622
1623 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1624
1625 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1626
1627 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1628
1629 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1630
1631 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1632
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001633 @classmethod
1634 def _is_fd_assigned(cls, fd):
1635 try:
1636 os.fstat(fd)
1637 except OSError as e:
1638 if e.errno == errno.EBADF:
1639 return False
1640 raise
1641 else:
1642 return True
1643
1644 @classmethod
1645 def _writefd(cls, conn, data, create_dummy_fds=False):
1646 if create_dummy_fds:
1647 for i in range(0, 256):
1648 if not cls._is_fd_assigned(i):
1649 os.dup2(conn.fileno(), i)
1650 fd = reduction.recv_handle(conn)
1651 if msvcrt:
1652 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1653 os.write(fd, data)
1654 os.close(fd)
1655
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001656 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001657 def test_fd_transfer(self):
1658 if self.TYPE != 'processes':
1659 self.skipTest("only makes sense with processes")
1660 conn, child_conn = self.Pipe(duplex=True)
1661
1662 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001663 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001664 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001665 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001666 with open(test.support.TESTFN, "wb") as f:
1667 fd = f.fileno()
1668 if msvcrt:
1669 fd = msvcrt.get_osfhandle(fd)
1670 reduction.send_handle(conn, fd, p.pid)
1671 p.join()
1672 with open(test.support.TESTFN, "rb") as f:
1673 self.assertEqual(f.read(), b"foo")
1674
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001675 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001676 @unittest.skipIf(sys.platform == "win32",
1677 "test semantics don't make sense on Windows")
1678 @unittest.skipIf(MAXFD <= 256,
1679 "largest assignable fd number is too small")
1680 @unittest.skipUnless(hasattr(os, "dup2"),
1681 "test needs os.dup2()")
1682 def test_large_fd_transfer(self):
1683 # With fd > 256 (issue #11657)
1684 if self.TYPE != 'processes':
1685 self.skipTest("only makes sense with processes")
1686 conn, child_conn = self.Pipe(duplex=True)
1687
1688 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001689 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001690 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001691 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001692 with open(test.support.TESTFN, "wb") as f:
1693 fd = f.fileno()
1694 for newfd in range(256, MAXFD):
1695 if not self._is_fd_assigned(newfd):
1696 break
1697 else:
1698 self.fail("could not find an unassigned large file descriptor")
1699 os.dup2(fd, newfd)
1700 try:
1701 reduction.send_handle(conn, newfd, p.pid)
1702 finally:
1703 os.close(newfd)
1704 p.join()
1705 with open(test.support.TESTFN, "rb") as f:
1706 self.assertEqual(f.read(), b"bar")
1707
Jesus Cea4507e642011-09-21 03:53:25 +02001708 @classmethod
1709 def _send_data_without_fd(self, conn):
1710 os.write(conn.fileno(), b"\0")
1711
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001712 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001713 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1714 def test_missing_fd_transfer(self):
1715 # Check that exception is raised when received data is not
1716 # accompanied by a file descriptor in ancillary data.
1717 if self.TYPE != 'processes':
1718 self.skipTest("only makes sense with processes")
1719 conn, child_conn = self.Pipe(duplex=True)
1720
1721 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1722 p.daemon = True
1723 p.start()
1724 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1725 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001726
Benjamin Petersone711caf2008-06-11 16:44:04 +00001727class _TestListenerClient(BaseTestCase):
1728
1729 ALLOWED_TYPES = ('processes', 'threads')
1730
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001731 @classmethod
1732 def _test(cls, address):
1733 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001734 conn.send('hello')
1735 conn.close()
1736
1737 def test_listener_client(self):
1738 for family in self.connection.families:
1739 l = self.connection.Listener(family=family)
1740 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001741 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001742 p.start()
1743 conn = l.accept()
1744 self.assertEqual(conn.recv(), 'hello')
1745 p.join()
1746 l.close()
Richard Oudkerk7ef909c2012-05-05 20:41:23 +01001747
1748 def test_issue14725(self):
1749 l = self.connection.Listener()
1750 p = self.Process(target=self._test, args=(l.address,))
1751 p.daemon = True
1752 p.start()
1753 time.sleep(1)
1754 # On Windows the client process should by now have connected,
1755 # written data and closed the pipe handle by now. This causes
1756 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1757 # 14725.
1758 conn = l.accept()
1759 self.assertEqual(conn.recv(), 'hello')
1760 conn.close()
1761 p.join()
1762 l.close()
1763
Benjamin Petersone711caf2008-06-11 16:44:04 +00001764#
1765# Test of sending connection and socket objects between processes
1766#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001767"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001768class _TestPicklingConnections(BaseTestCase):
1769
1770 ALLOWED_TYPES = ('processes',)
1771
1772 def _listener(self, conn, families):
1773 for fam in families:
1774 l = self.connection.Listener(family=fam)
1775 conn.send(l.address)
1776 new_conn = l.accept()
1777 conn.send(new_conn)
1778
1779 if self.TYPE == 'processes':
1780 l = socket.socket()
1781 l.bind(('localhost', 0))
1782 conn.send(l.getsockname())
1783 l.listen(1)
1784 new_conn, addr = l.accept()
1785 conn.send(new_conn)
1786
1787 conn.recv()
1788
1789 def _remote(self, conn):
1790 for (address, msg) in iter(conn.recv, None):
1791 client = self.connection.Client(address)
1792 client.send(msg.upper())
1793 client.close()
1794
1795 if self.TYPE == 'processes':
1796 address, msg = conn.recv()
1797 client = socket.socket()
1798 client.connect(address)
1799 client.sendall(msg.upper())
1800 client.close()
1801
1802 conn.close()
1803
1804 def test_pickling(self):
1805 try:
1806 multiprocessing.allow_connection_pickling()
1807 except ImportError:
1808 return
1809
1810 families = self.connection.families
1811
1812 lconn, lconn0 = self.Pipe()
1813 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001814 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001815 lp.start()
1816 lconn0.close()
1817
1818 rconn, rconn0 = self.Pipe()
1819 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001820 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001821 rp.start()
1822 rconn0.close()
1823
1824 for fam in families:
1825 msg = ('This connection uses family %s' % fam).encode('ascii')
1826 address = lconn.recv()
1827 rconn.send((address, msg))
1828 new_conn = lconn.recv()
1829 self.assertEqual(new_conn.recv(), msg.upper())
1830
1831 rconn.send(None)
1832
1833 if self.TYPE == 'processes':
1834 msg = latin('This connection uses a normal socket')
1835 address = lconn.recv()
1836 rconn.send((address, msg))
1837 if hasattr(socket, 'fromfd'):
1838 new_conn = lconn.recv()
1839 self.assertEqual(new_conn.recv(100), msg.upper())
1840 else:
1841 # XXX On Windows with Py2.6 need to backport fromfd()
1842 discard = lconn.recv_bytes()
1843
1844 lconn.send(None)
1845
1846 rconn.close()
1847 lconn.close()
1848
1849 lp.join()
1850 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001851"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001852#
1853#
1854#
1855
1856class _TestHeap(BaseTestCase):
1857
1858 ALLOWED_TYPES = ('processes',)
1859
1860 def test_heap(self):
1861 iterations = 5000
1862 maxblocks = 50
1863 blocks = []
1864
1865 # create and destroy lots of blocks of different sizes
1866 for i in range(iterations):
1867 size = int(random.lognormvariate(0, 1) * 1000)
1868 b = multiprocessing.heap.BufferWrapper(size)
1869 blocks.append(b)
1870 if len(blocks) > maxblocks:
1871 i = random.randrange(maxblocks)
1872 del blocks[i]
1873
1874 # get the heap object
1875 heap = multiprocessing.heap.BufferWrapper._heap
1876
1877 # verify the state of the heap
1878 all = []
1879 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001880 heap._lock.acquire()
1881 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001882 for L in list(heap._len_to_seq.values()):
1883 for arena, start, stop in L:
1884 all.append((heap._arenas.index(arena), start, stop,
1885 stop-start, 'free'))
1886 for arena, start, stop in heap._allocated_blocks:
1887 all.append((heap._arenas.index(arena), start, stop,
1888 stop-start, 'occupied'))
1889 occupied += (stop-start)
1890
1891 all.sort()
1892
1893 for i in range(len(all)-1):
1894 (arena, start, stop) = all[i][:3]
1895 (narena, nstart, nstop) = all[i+1][:3]
1896 self.assertTrue((arena != narena and nstart == 0) or
1897 (stop == nstart))
1898
Charles-François Natali778db492011-07-02 14:35:49 +02001899 def test_free_from_gc(self):
1900 # Check that freeing of blocks by the garbage collector doesn't deadlock
1901 # (issue #12352).
1902 # Make sure the GC is enabled, and set lower collection thresholds to
1903 # make collections more frequent (and increase the probability of
1904 # deadlock).
1905 if not gc.isenabled():
1906 gc.enable()
1907 self.addCleanup(gc.disable)
1908 thresholds = gc.get_threshold()
1909 self.addCleanup(gc.set_threshold, *thresholds)
1910 gc.set_threshold(10)
1911
1912 # perform numerous block allocations, with cyclic references to make
1913 # sure objects are collected asynchronously by the gc
1914 for i in range(5000):
1915 a = multiprocessing.heap.BufferWrapper(1)
1916 b = multiprocessing.heap.BufferWrapper(1)
1917 # circular references
1918 a.buddy = b
1919 b.buddy = a
1920
Benjamin Petersone711caf2008-06-11 16:44:04 +00001921#
1922#
1923#
1924
Benjamin Petersone711caf2008-06-11 16:44:04 +00001925class _Foo(Structure):
1926 _fields_ = [
1927 ('x', c_int),
1928 ('y', c_double)
1929 ]
1930
1931class _TestSharedCTypes(BaseTestCase):
1932
1933 ALLOWED_TYPES = ('processes',)
1934
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001935 def setUp(self):
1936 if not HAS_SHAREDCTYPES:
1937 self.skipTest("requires multiprocessing.sharedctypes")
1938
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001939 @classmethod
1940 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001941 x.value *= 2
1942 y.value *= 2
1943 foo.x *= 2
1944 foo.y *= 2
1945 string.value *= 2
1946 for i in range(len(arr)):
1947 arr[i] *= 2
1948
1949 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001950 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001951 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001952 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001953 arr = self.Array('d', list(range(10)), lock=lock)
1954 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001955 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001956
1957 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001958 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001959 p.start()
1960 p.join()
1961
1962 self.assertEqual(x.value, 14)
1963 self.assertAlmostEqual(y.value, 2.0/3.0)
1964 self.assertEqual(foo.x, 6)
1965 self.assertAlmostEqual(foo.y, 4.0)
1966 for i in range(10):
1967 self.assertAlmostEqual(arr[i], i*2)
1968 self.assertEqual(string.value, latin('hellohello'))
1969
1970 def test_synchronize(self):
1971 self.test_sharedctypes(lock=True)
1972
1973 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001974 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001975 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001976 foo.x = 0
1977 foo.y = 0
1978 self.assertEqual(bar.x, 2)
1979 self.assertAlmostEqual(bar.y, 5.0)
1980
1981#
1982#
1983#
1984
1985class _TestFinalize(BaseTestCase):
1986
1987 ALLOWED_TYPES = ('processes',)
1988
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001989 @classmethod
1990 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991 class Foo(object):
1992 pass
1993
1994 a = Foo()
1995 util.Finalize(a, conn.send, args=('a',))
1996 del a # triggers callback for a
1997
1998 b = Foo()
1999 close_b = util.Finalize(b, conn.send, args=('b',))
2000 close_b() # triggers callback for b
2001 close_b() # does nothing because callback has already been called
2002 del b # does nothing because callback has already been called
2003
2004 c = Foo()
2005 util.Finalize(c, conn.send, args=('c',))
2006
2007 d10 = Foo()
2008 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2009
2010 d01 = Foo()
2011 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2012 d02 = Foo()
2013 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2014 d03 = Foo()
2015 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2016
2017 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2018
2019 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2020
Ezio Melotti13925002011-03-16 11:05:33 +02002021 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002022 # garbage collecting locals
2023 util._exit_function()
2024 conn.close()
2025 os._exit(0)
2026
2027 def test_finalize(self):
2028 conn, child_conn = self.Pipe()
2029
2030 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002031 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002032 p.start()
2033 p.join()
2034
2035 result = [obj for obj in iter(conn.recv, 'STOP')]
2036 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2037
2038#
2039# Test that from ... import * works for each module
2040#
2041
2042class _TestImportStar(BaseTestCase):
2043
2044 ALLOWED_TYPES = ('processes',)
2045
2046 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002047 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002048 'multiprocessing', 'multiprocessing.connection',
2049 'multiprocessing.heap', 'multiprocessing.managers',
2050 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002051 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002052 ]
2053
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002054 if HAS_REDUCTION:
2055 modules.append('multiprocessing.reduction')
2056
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002057 if c_int is not None:
2058 # This module requires _ctypes
2059 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002060
2061 for name in modules:
2062 __import__(name)
2063 mod = sys.modules[name]
2064
2065 for attr in getattr(mod, '__all__', ()):
2066 self.assertTrue(
2067 hasattr(mod, attr),
2068 '%r does not have attribute %r' % (mod, attr)
2069 )
2070
2071#
2072# Quick test that logging works -- does not test logging output
2073#
2074
2075class _TestLogging(BaseTestCase):
2076
2077 ALLOWED_TYPES = ('processes',)
2078
2079 def test_enable_logging(self):
2080 logger = multiprocessing.get_logger()
2081 logger.setLevel(util.SUBWARNING)
2082 self.assertTrue(logger is not None)
2083 logger.debug('this will not be printed')
2084 logger.info('nor will this')
2085 logger.setLevel(LOG_LEVEL)
2086
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002087 @classmethod
2088 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002089 logger = multiprocessing.get_logger()
2090 conn.send(logger.getEffectiveLevel())
2091
2092 def test_level(self):
2093 LEVEL1 = 32
2094 LEVEL2 = 37
2095
2096 logger = multiprocessing.get_logger()
2097 root_logger = logging.getLogger()
2098 root_level = root_logger.level
2099
2100 reader, writer = multiprocessing.Pipe(duplex=False)
2101
2102 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002103 p = self.Process(target=self._test_level, args=(writer,))
2104 p.daemon = True
2105 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002106 self.assertEqual(LEVEL1, reader.recv())
2107
2108 logger.setLevel(logging.NOTSET)
2109 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002110 p = self.Process(target=self._test_level, args=(writer,))
2111 p.daemon = True
2112 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002113 self.assertEqual(LEVEL2, reader.recv())
2114
2115 root_logger.setLevel(root_level)
2116 logger.setLevel(level=LOG_LEVEL)
2117
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002118
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002119# class _TestLoggingProcessName(BaseTestCase):
2120#
2121# def handle(self, record):
2122# assert record.processName == multiprocessing.current_process().name
2123# self.__handled = True
2124#
2125# def test_logging(self):
2126# handler = logging.Handler()
2127# handler.handle = self.handle
2128# self.__handled = False
2129# # Bypass getLogger() and side-effects
2130# logger = logging.getLoggerClass()(
2131# 'multiprocessing.test.TestLoggingProcessName')
2132# logger.addHandler(handler)
2133# logger.propagate = False
2134#
2135# logger.warn('foo')
2136# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002137
Benjamin Petersone711caf2008-06-11 16:44:04 +00002138#
Jesse Noller6214edd2009-01-19 16:23:53 +00002139# Test to verify handle verification, see issue 3321
2140#
2141
2142class TestInvalidHandle(unittest.TestCase):
2143
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002144 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002145 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002146 conn = _multiprocessing.Connection(44977608)
2147 self.assertRaises(IOError, conn.poll)
2148 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002149
Jesse Noller6214edd2009-01-19 16:23:53 +00002150#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002151# Functions used to create test cases from the base ones in this module
2152#
2153
2154def get_attributes(Source, names):
2155 d = {}
2156 for name in names:
2157 obj = getattr(Source, name)
2158 if type(obj) == type(get_attributes):
2159 obj = staticmethod(obj)
2160 d[name] = obj
2161 return d
2162
2163def create_test_cases(Mixin, type):
2164 result = {}
2165 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002166 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002167
2168 for name in list(glob.keys()):
2169 if name.startswith('_Test'):
2170 base = glob[name]
2171 if type in base.ALLOWED_TYPES:
2172 newname = 'With' + Type + name[1:]
2173 class Temp(base, unittest.TestCase, Mixin):
2174 pass
2175 result[newname] = Temp
2176 Temp.__name__ = newname
2177 Temp.__module__ = Mixin.__module__
2178 return result
2179
2180#
2181# Create test cases
2182#
2183
2184class ProcessesMixin(object):
2185 TYPE = 'processes'
2186 Process = multiprocessing.Process
2187 locals().update(get_attributes(multiprocessing, (
2188 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2189 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2190 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002191 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002192 )))
2193
2194testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2195globals().update(testcases_processes)
2196
2197
2198class ManagerMixin(object):
2199 TYPE = 'manager'
2200 Process = multiprocessing.Process
2201 manager = object.__new__(multiprocessing.managers.SyncManager)
2202 locals().update(get_attributes(manager, (
2203 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2204 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002205 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002206 )))
2207
2208testcases_manager = create_test_cases(ManagerMixin, type='manager')
2209globals().update(testcases_manager)
2210
2211
2212class ThreadsMixin(object):
2213 TYPE = 'threads'
2214 Process = multiprocessing.dummy.Process
2215 locals().update(get_attributes(multiprocessing.dummy, (
2216 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2217 'Condition', 'Event', 'Value', 'Array', 'current_process',
2218 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002219 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002220 )))
2221
2222testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2223globals().update(testcases_threads)
2224
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002225class OtherTest(unittest.TestCase):
2226 # TODO: add more tests for deliver/answer challenge.
2227 def test_deliver_challenge_auth_failure(self):
2228 class _FakeConnection(object):
2229 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002230 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002231 def send_bytes(self, data):
2232 pass
2233 self.assertRaises(multiprocessing.AuthenticationError,
2234 multiprocessing.connection.deliver_challenge,
2235 _FakeConnection(), b'abc')
2236
2237 def test_answer_challenge_auth_failure(self):
2238 class _FakeConnection(object):
2239 def __init__(self):
2240 self.count = 0
2241 def recv_bytes(self, size):
2242 self.count += 1
2243 if self.count == 1:
2244 return multiprocessing.connection.CHALLENGE
2245 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002246 return b'something bogus'
2247 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002248 def send_bytes(self, data):
2249 pass
2250 self.assertRaises(multiprocessing.AuthenticationError,
2251 multiprocessing.connection.answer_challenge,
2252 _FakeConnection(), b'abc')
2253
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002254#
2255# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2256#
2257
2258def initializer(ns):
2259 ns.test += 1
2260
2261class TestInitializers(unittest.TestCase):
2262 def setUp(self):
2263 self.mgr = multiprocessing.Manager()
2264 self.ns = self.mgr.Namespace()
2265 self.ns.test = 0
2266
2267 def tearDown(self):
2268 self.mgr.shutdown()
2269
2270 def test_manager_initializer(self):
2271 m = multiprocessing.managers.SyncManager()
2272 self.assertRaises(TypeError, m.start, 1)
2273 m.start(initializer, (self.ns,))
2274 self.assertEqual(self.ns.test, 1)
2275 m.shutdown()
2276
2277 def test_pool_initializer(self):
2278 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2279 p = multiprocessing.Pool(1, initializer, (self.ns,))
2280 p.close()
2281 p.join()
2282 self.assertEqual(self.ns.test, 1)
2283
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002284#
2285# Issue 5155, 5313, 5331: Test process in processes
2286# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2287#
2288
2289def _ThisSubProcess(q):
2290 try:
2291 item = q.get(block=False)
2292 except pyqueue.Empty:
2293 pass
2294
2295def _TestProcess(q):
2296 queue = multiprocessing.Queue()
2297 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002298 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002299 subProc.start()
2300 subProc.join()
2301
2302def _afunc(x):
2303 return x*x
2304
2305def pool_in_process():
2306 pool = multiprocessing.Pool(processes=4)
2307 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2308
2309class _file_like(object):
2310 def __init__(self, delegate):
2311 self._delegate = delegate
2312 self._pid = None
2313
2314 @property
2315 def cache(self):
2316 pid = os.getpid()
2317 # There are no race conditions since fork keeps only the running thread
2318 if pid != self._pid:
2319 self._pid = pid
2320 self._cache = []
2321 return self._cache
2322
2323 def write(self, data):
2324 self.cache.append(data)
2325
2326 def flush(self):
2327 self._delegate.write(''.join(self.cache))
2328 self._cache = []
2329
2330class TestStdinBadfiledescriptor(unittest.TestCase):
2331
2332 def test_queue_in_process(self):
2333 queue = multiprocessing.Queue()
2334 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2335 proc.start()
2336 proc.join()
2337
2338 def test_pool_in_process(self):
2339 p = multiprocessing.Process(target=pool_in_process)
2340 p.start()
2341 p.join()
2342
2343 def test_flushing(self):
2344 sio = io.StringIO()
2345 flike = _file_like(sio)
2346 flike.write('foo')
2347 proc = multiprocessing.Process(target=lambda: flike.flush())
2348 flike.flush()
2349 assert sio.getvalue() == 'foo'
2350
Antoine Pitrou709176f2012-04-01 17:19:09 +02002351
2352#
2353# Issue 14151: Test invalid family on invalid environment
2354#
2355
2356class TestInvalidFamily(unittest.TestCase):
2357
2358 @unittest.skipIf(WIN32, "skipped on Windows")
2359 def test_invalid_family(self):
2360 with self.assertRaises(ValueError):
2361 multiprocessing.connection.Listener(r'\\.\test')
2362
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002363 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2364 def test_invalid_family_win32(self):
2365 with self.assertRaises(ValueError):
2366 multiprocessing.connection.Listener('/var/test.pipe')
2367
2368
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002369testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Antoine Pitrou709176f2012-04-01 17:19:09 +02002370 TestStdinBadfiledescriptor, TestInvalidFamily]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002371
Benjamin Petersone711caf2008-06-11 16:44:04 +00002372#
2373#
2374#
2375
2376def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002377 if sys.platform.startswith("linux"):
2378 try:
2379 lock = multiprocessing.RLock()
2380 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002381 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002382
Charles-François Natali3be00952011-11-22 18:36:39 +01002383 check_enough_semaphores()
2384
Benjamin Petersone711caf2008-06-11 16:44:04 +00002385 if run is None:
2386 from test.support import run_unittest as run
2387
2388 util.get_temp_dir() # creates temp directory for use by all processes
2389
2390 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2391
Benjamin Peterson41181742008-07-02 20:22:54 +00002392 ProcessesMixin.pool = multiprocessing.Pool(4)
2393 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2394 ManagerMixin.manager.__init__()
2395 ManagerMixin.manager.start()
2396 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002397
2398 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002399 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2400 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002401 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2402 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002403 )
2404
2405 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2406 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2407 run(suite)
2408
Benjamin Peterson41181742008-07-02 20:22:54 +00002409 ThreadsMixin.pool.terminate()
2410 ProcessesMixin.pool.terminate()
2411 ManagerMixin.pool.terminate()
2412 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002413
Benjamin Peterson41181742008-07-02 20:22:54 +00002414 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002415
2416def main():
2417 test_main(unittest.TextTestRunner(verbosity=2).run)
2418
2419if __name__ == '__main__':
2420 main()