blob: 78cbdacd8f6b6570b122d9a7ad1133d3cc760ac6 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086
Richard Oudkerk59d54042012-05-10 16:11:12 +010087from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
Richard Oudkerk59d54042012-05-10 16:11:12 +010089def wait_for_handle(handle, timeout):
90 if timeout is not None and timeout < 0.0:
91 timeout = None
92 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000093
Antoine Pitroubcb39d42011-08-23 19:46:22 +020094try:
95 MAXFD = os.sysconf("SC_OPEN_MAX")
96except:
97 MAXFD = 256
98
Benjamin Petersone711caf2008-06-11 16:44:04 +000099#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000100# Some tests require ctypes
101#
102
103try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000104 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000105except ImportError:
106 Structure = object
107 c_int = c_double = None
108
Charles-François Natali221ef672011-11-22 18:55:22 +0100109
110def check_enough_semaphores():
111 """Check that the system supports enough semaphores to run the test."""
112 # minimum number of semaphores available according to POSIX
113 nsems_min = 256
114 try:
115 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
116 except (AttributeError, ValueError):
117 # sysconf not available or setting not available
118 return
119 if nsems == -1 or nsems >= nsems_min:
120 return
121 raise unittest.SkipTest("The OS doesn't support enough semaphores "
122 "to run the test (required: %d)." % nsems_min)
123
124
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000125#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126# Creates a wrapper for a function which records the time it takes to finish
127#
128
129class TimingWrapper(object):
130
131 def __init__(self, func):
132 self.func = func
133 self.elapsed = None
134
135 def __call__(self, *args, **kwds):
136 t = time.time()
137 try:
138 return self.func(*args, **kwds)
139 finally:
140 self.elapsed = time.time() - t
141
142#
143# Base class for test cases
144#
145
146class BaseTestCase(object):
147
148 ALLOWED_TYPES = ('processes', 'manager', 'threads')
149
150 def assertTimingAlmostEqual(self, a, b):
151 if CHECK_TIMINGS:
152 self.assertAlmostEqual(a, b, 1)
153
154 def assertReturnsIfImplemented(self, value, func, *args):
155 try:
156 res = func(*args)
157 except NotImplementedError:
158 pass
159 else:
160 return self.assertEqual(value, res)
161
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000162 # For the sanity of Windows users, rather than crashing or freezing in
163 # multiple ways.
164 def __reduce__(self, *args):
165 raise NotImplementedError("shouldn't try to pickle a test case")
166
167 __reduce_ex__ = __reduce__
168
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169#
170# Return the value of a semaphore
171#
172
173def get_value(self):
174 try:
175 return self.get_value()
176 except AttributeError:
177 try:
178 return self._Semaphore__value
179 except AttributeError:
180 try:
181 return self._value
182 except AttributeError:
183 raise NotImplementedError
184
185#
186# Testcases
187#
188
189class _TestProcess(BaseTestCase):
190
191 ALLOWED_TYPES = ('processes', 'threads')
192
193 def test_current(self):
194 if self.TYPE == 'threads':
195 return
196
197 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
200 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000202 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertEqual(current.ident, os.getpid())
205 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000207 def test_daemon_argument(self):
208 if self.TYPE == "threads":
209 return
210
211 # By default uses the current process's daemon flag.
212 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000213 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000214 proc1 = self.Process(target=self._test, daemon=True)
215 self.assertTrue(proc1.daemon)
216 proc2 = self.Process(target=self._test, daemon=False)
217 self.assertFalse(proc2.daemon)
218
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000219 @classmethod
220 def _test(cls, q, *args, **kwds):
221 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 q.put(args)
223 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000225 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227 q.put(current.pid)
228
229 def test_process(self):
230 q = self.Queue(1)
231 e = self.Event()
232 args = (q, 1, 2)
233 kwargs = {'hello':23, 'bye':2.54}
234 name = 'SomeProcess'
235 p = self.Process(
236 target=self._test, args=args, kwargs=kwargs, name=name
237 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000238 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239 current = self.current_process()
240
241 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000242 self.assertEqual(p.authkey, current.authkey)
243 self.assertEqual(p.is_alive(), False)
244 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000245 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000247 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248
249 p.start()
250
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(p.exitcode, None)
252 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(q.get(), args[1:])
256 self.assertEqual(q.get(), kwargs)
257 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000259 self.assertEqual(q.get(), current.authkey)
260 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 p.join()
263
Ezio Melottib3aedd42010-11-20 19:04:17 +0000264 self.assertEqual(p.exitcode, 0)
265 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000266 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000268 @classmethod
269 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 time.sleep(1000)
271
272 def test_terminate(self):
273 if self.TYPE == 'threads':
274 return
275
276 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000277 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000278 p.start()
279
280 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000281 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000282 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283
Richard Oudkerk59d54042012-05-10 16:11:12 +0100284 join = TimingWrapper(p.join)
285
286 self.assertEqual(join(0), None)
287 self.assertTimingAlmostEqual(join.elapsed, 0.0)
288 self.assertEqual(p.is_alive(), True)
289
290 self.assertEqual(join(-1), None)
291 self.assertTimingAlmostEqual(join.elapsed, 0.0)
292 self.assertEqual(p.is_alive(), True)
293
Benjamin Petersone711caf2008-06-11 16:44:04 +0000294 p.terminate()
295
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296 self.assertEqual(join(), None)
297 self.assertTimingAlmostEqual(join.elapsed, 0.0)
298
299 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000300 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301
302 p.join()
303
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000304 # XXX sometimes get p.exitcode == 0 on Windows ...
305 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306
307 def test_cpu_count(self):
308 try:
309 cpus = multiprocessing.cpu_count()
310 except NotImplementedError:
311 cpus = 1
312 self.assertTrue(type(cpus) is int)
313 self.assertTrue(cpus >= 1)
314
315 def test_active_children(self):
316 self.assertEqual(type(self.active_children()), list)
317
318 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000319 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320
Jesus Cea94f964f2011-09-09 20:26:57 +0200321 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000323 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324
325 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000328 @classmethod
329 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330 from multiprocessing import forking
331 wconn.send(id)
332 if len(id) < 2:
333 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000334 p = cls.Process(
335 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 )
337 p.start()
338 p.join()
339
340 def test_recursion(self):
341 rconn, wconn = self.Pipe(duplex=False)
342 self._test_recursion(wconn, [])
343
344 time.sleep(DELTA)
345 result = []
346 while rconn.poll():
347 result.append(rconn.recv())
348
349 expected = [
350 [],
351 [0],
352 [0, 0],
353 [0, 1],
354 [1],
355 [1, 0],
356 [1, 1]
357 ]
358 self.assertEqual(result, expected)
359
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200360 @classmethod
361 def _test_sentinel(cls, event):
362 event.wait(10.0)
363
364 def test_sentinel(self):
365 if self.TYPE == "threads":
366 return
367 event = self.Event()
368 p = self.Process(target=self._test_sentinel, args=(event,))
369 with self.assertRaises(ValueError):
370 p.sentinel
371 p.start()
372 self.addCleanup(p.join)
373 sentinel = p.sentinel
374 self.assertIsInstance(sentinel, int)
375 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
376 event.set()
377 p.join()
378 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
379
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380#
381#
382#
383
384class _UpperCaser(multiprocessing.Process):
385
386 def __init__(self):
387 multiprocessing.Process.__init__(self)
388 self.child_conn, self.parent_conn = multiprocessing.Pipe()
389
390 def run(self):
391 self.parent_conn.close()
392 for s in iter(self.child_conn.recv, None):
393 self.child_conn.send(s.upper())
394 self.child_conn.close()
395
396 def submit(self, s):
397 assert type(s) is str
398 self.parent_conn.send(s)
399 return self.parent_conn.recv()
400
401 def stop(self):
402 self.parent_conn.send(None)
403 self.parent_conn.close()
404 self.child_conn.close()
405
406class _TestSubclassingProcess(BaseTestCase):
407
408 ALLOWED_TYPES = ('processes',)
409
410 def test_subclassing(self):
411 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200412 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413 uppercaser.start()
414 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
415 self.assertEqual(uppercaser.submit('world'), 'WORLD')
416 uppercaser.stop()
417 uppercaser.join()
418
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100419 def test_stderr_flush(self):
420 # sys.stderr is flushed at process shutdown (issue #13812)
421 if self.TYPE == "threads":
422 return
423
424 testfn = test.support.TESTFN
425 self.addCleanup(test.support.unlink, testfn)
426 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
427 proc.start()
428 proc.join()
429 with open(testfn, 'r') as f:
430 err = f.read()
431 # The whole traceback was printed
432 self.assertIn("ZeroDivisionError", err)
433 self.assertIn("test_multiprocessing.py", err)
434 self.assertIn("1/0 # MARKER", err)
435
436 @classmethod
437 def _test_stderr_flush(cls, testfn):
438 sys.stderr = open(testfn, 'w')
439 1/0 # MARKER
440
441
Benjamin Petersone711caf2008-06-11 16:44:04 +0000442#
443#
444#
445
446def queue_empty(q):
447 if hasattr(q, 'empty'):
448 return q.empty()
449 else:
450 return q.qsize() == 0
451
452def queue_full(q, maxsize):
453 if hasattr(q, 'full'):
454 return q.full()
455 else:
456 return q.qsize() == maxsize
457
458
459class _TestQueue(BaseTestCase):
460
461
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000462 @classmethod
463 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000464 child_can_start.wait()
465 for i in range(6):
466 queue.get()
467 parent_can_continue.set()
468
469 def test_put(self):
470 MAXSIZE = 6
471 queue = self.Queue(maxsize=MAXSIZE)
472 child_can_start = self.Event()
473 parent_can_continue = self.Event()
474
475 proc = self.Process(
476 target=self._test_put,
477 args=(queue, child_can_start, parent_can_continue)
478 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000479 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000480 proc.start()
481
482 self.assertEqual(queue_empty(queue), True)
483 self.assertEqual(queue_full(queue, MAXSIZE), False)
484
485 queue.put(1)
486 queue.put(2, True)
487 queue.put(3, True, None)
488 queue.put(4, False)
489 queue.put(5, False, None)
490 queue.put_nowait(6)
491
492 # the values may be in buffer but not yet in pipe so sleep a bit
493 time.sleep(DELTA)
494
495 self.assertEqual(queue_empty(queue), False)
496 self.assertEqual(queue_full(queue, MAXSIZE), True)
497
498 put = TimingWrapper(queue.put)
499 put_nowait = TimingWrapper(queue.put_nowait)
500
501 self.assertRaises(pyqueue.Full, put, 7, False)
502 self.assertTimingAlmostEqual(put.elapsed, 0)
503
504 self.assertRaises(pyqueue.Full, put, 7, False, None)
505 self.assertTimingAlmostEqual(put.elapsed, 0)
506
507 self.assertRaises(pyqueue.Full, put_nowait, 7)
508 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
509
510 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
511 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
512
513 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
514 self.assertTimingAlmostEqual(put.elapsed, 0)
515
516 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
517 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
518
519 child_can_start.set()
520 parent_can_continue.wait()
521
522 self.assertEqual(queue_empty(queue), True)
523 self.assertEqual(queue_full(queue, MAXSIZE), False)
524
525 proc.join()
526
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000527 @classmethod
528 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000529 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000530 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000531 queue.put(2)
532 queue.put(3)
533 queue.put(4)
534 queue.put(5)
535 parent_can_continue.set()
536
537 def test_get(self):
538 queue = self.Queue()
539 child_can_start = self.Event()
540 parent_can_continue = self.Event()
541
542 proc = self.Process(
543 target=self._test_get,
544 args=(queue, child_can_start, parent_can_continue)
545 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000546 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000547 proc.start()
548
549 self.assertEqual(queue_empty(queue), True)
550
551 child_can_start.set()
552 parent_can_continue.wait()
553
554 time.sleep(DELTA)
555 self.assertEqual(queue_empty(queue), False)
556
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000557 # Hangs unexpectedly, remove for now
558 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000559 self.assertEqual(queue.get(True, None), 2)
560 self.assertEqual(queue.get(True), 3)
561 self.assertEqual(queue.get(timeout=1), 4)
562 self.assertEqual(queue.get_nowait(), 5)
563
564 self.assertEqual(queue_empty(queue), True)
565
566 get = TimingWrapper(queue.get)
567 get_nowait = TimingWrapper(queue.get_nowait)
568
569 self.assertRaises(pyqueue.Empty, get, False)
570 self.assertTimingAlmostEqual(get.elapsed, 0)
571
572 self.assertRaises(pyqueue.Empty, get, False, None)
573 self.assertTimingAlmostEqual(get.elapsed, 0)
574
575 self.assertRaises(pyqueue.Empty, get_nowait)
576 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
577
578 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
579 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
580
581 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
582 self.assertTimingAlmostEqual(get.elapsed, 0)
583
584 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
585 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
586
587 proc.join()
588
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000589 @classmethod
590 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000591 for i in range(10, 20):
592 queue.put(i)
593 # note that at this point the items may only be buffered, so the
594 # process cannot shutdown until the feeder thread has finished
595 # pushing items onto the pipe.
596
597 def test_fork(self):
598 # Old versions of Queue would fail to create a new feeder
599 # thread for a forked process if the original process had its
600 # own feeder thread. This test checks that this no longer
601 # happens.
602
603 queue = self.Queue()
604
605 # put items on queue so that main process starts a feeder thread
606 for i in range(10):
607 queue.put(i)
608
609 # wait to make sure thread starts before we fork a new process
610 time.sleep(DELTA)
611
612 # fork process
613 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200614 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000615 p.start()
616
617 # check that all expected items are in the queue
618 for i in range(20):
619 self.assertEqual(queue.get(), i)
620 self.assertRaises(pyqueue.Empty, queue.get, False)
621
622 p.join()
623
624 def test_qsize(self):
625 q = self.Queue()
626 try:
627 self.assertEqual(q.qsize(), 0)
628 except NotImplementedError:
629 return
630 q.put(1)
631 self.assertEqual(q.qsize(), 1)
632 q.put(5)
633 self.assertEqual(q.qsize(), 2)
634 q.get()
635 self.assertEqual(q.qsize(), 1)
636 q.get()
637 self.assertEqual(q.qsize(), 0)
638
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000639 @classmethod
640 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000641 for obj in iter(q.get, None):
642 time.sleep(DELTA)
643 q.task_done()
644
645 def test_task_done(self):
646 queue = self.JoinableQueue()
647
648 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000649 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000650
651 workers = [self.Process(target=self._test_task_done, args=(queue,))
652 for i in range(4)]
653
654 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200655 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000656 p.start()
657
658 for i in range(10):
659 queue.put(i)
660
661 queue.join()
662
663 for p in workers:
664 queue.put(None)
665
666 for p in workers:
667 p.join()
668
669#
670#
671#
672
673class _TestLock(BaseTestCase):
674
675 def test_lock(self):
676 lock = self.Lock()
677 self.assertEqual(lock.acquire(), True)
678 self.assertEqual(lock.acquire(False), False)
679 self.assertEqual(lock.release(), None)
680 self.assertRaises((ValueError, threading.ThreadError), lock.release)
681
682 def test_rlock(self):
683 lock = self.RLock()
684 self.assertEqual(lock.acquire(), True)
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(), True)
687 self.assertEqual(lock.release(), None)
688 self.assertEqual(lock.release(), None)
689 self.assertEqual(lock.release(), None)
690 self.assertRaises((AssertionError, RuntimeError), lock.release)
691
Jesse Nollerf8d00852009-03-31 03:25:07 +0000692 def test_lock_context(self):
693 with self.Lock():
694 pass
695
Benjamin Petersone711caf2008-06-11 16:44:04 +0000696
697class _TestSemaphore(BaseTestCase):
698
699 def _test_semaphore(self, sem):
700 self.assertReturnsIfImplemented(2, get_value, sem)
701 self.assertEqual(sem.acquire(), True)
702 self.assertReturnsIfImplemented(1, get_value, sem)
703 self.assertEqual(sem.acquire(), True)
704 self.assertReturnsIfImplemented(0, get_value, sem)
705 self.assertEqual(sem.acquire(False), False)
706 self.assertReturnsIfImplemented(0, get_value, sem)
707 self.assertEqual(sem.release(), None)
708 self.assertReturnsIfImplemented(1, get_value, sem)
709 self.assertEqual(sem.release(), None)
710 self.assertReturnsIfImplemented(2, get_value, sem)
711
712 def test_semaphore(self):
713 sem = self.Semaphore(2)
714 self._test_semaphore(sem)
715 self.assertEqual(sem.release(), None)
716 self.assertReturnsIfImplemented(3, get_value, sem)
717 self.assertEqual(sem.release(), None)
718 self.assertReturnsIfImplemented(4, get_value, sem)
719
720 def test_bounded_semaphore(self):
721 sem = self.BoundedSemaphore(2)
722 self._test_semaphore(sem)
723 # Currently fails on OS/X
724 #if HAVE_GETVALUE:
725 # self.assertRaises(ValueError, sem.release)
726 # self.assertReturnsIfImplemented(2, get_value, sem)
727
728 def test_timeout(self):
729 if self.TYPE != 'processes':
730 return
731
732 sem = self.Semaphore(0)
733 acquire = TimingWrapper(sem.acquire)
734
735 self.assertEqual(acquire(False), False)
736 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
737
738 self.assertEqual(acquire(False, None), False)
739 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
740
741 self.assertEqual(acquire(False, TIMEOUT1), False)
742 self.assertTimingAlmostEqual(acquire.elapsed, 0)
743
744 self.assertEqual(acquire(True, TIMEOUT2), False)
745 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
746
747 self.assertEqual(acquire(timeout=TIMEOUT3), False)
748 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
749
750
751class _TestCondition(BaseTestCase):
752
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000753 @classmethod
754 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000755 cond.acquire()
756 sleeping.release()
757 cond.wait(timeout)
758 woken.release()
759 cond.release()
760
761 def check_invariant(self, cond):
762 # this is only supposed to succeed when there are no sleepers
763 if self.TYPE == 'processes':
764 try:
765 sleepers = (cond._sleeping_count.get_value() -
766 cond._woken_count.get_value())
767 self.assertEqual(sleepers, 0)
768 self.assertEqual(cond._wait_semaphore.get_value(), 0)
769 except NotImplementedError:
770 pass
771
772 def test_notify(self):
773 cond = self.Condition()
774 sleeping = self.Semaphore(0)
775 woken = self.Semaphore(0)
776
777 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000778 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 p.start()
780
781 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000782 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000783 p.start()
784
785 # wait for both children to start sleeping
786 sleeping.acquire()
787 sleeping.acquire()
788
789 # check no process/thread has woken up
790 time.sleep(DELTA)
791 self.assertReturnsIfImplemented(0, get_value, woken)
792
793 # wake up one process/thread
794 cond.acquire()
795 cond.notify()
796 cond.release()
797
798 # check one process/thread has woken up
799 time.sleep(DELTA)
800 self.assertReturnsIfImplemented(1, get_value, woken)
801
802 # wake up another
803 cond.acquire()
804 cond.notify()
805 cond.release()
806
807 # check other has woken up
808 time.sleep(DELTA)
809 self.assertReturnsIfImplemented(2, get_value, woken)
810
811 # check state is not mucked up
812 self.check_invariant(cond)
813 p.join()
814
815 def test_notify_all(self):
816 cond = self.Condition()
817 sleeping = self.Semaphore(0)
818 woken = self.Semaphore(0)
819
820 # start some threads/processes which will timeout
821 for i in range(3):
822 p = self.Process(target=self.f,
823 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000824 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000825 p.start()
826
827 t = threading.Thread(target=self.f,
828 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000829 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830 t.start()
831
832 # wait for them all to sleep
833 for i in range(6):
834 sleeping.acquire()
835
836 # check they have all timed out
837 for i in range(6):
838 woken.acquire()
839 self.assertReturnsIfImplemented(0, get_value, woken)
840
841 # check state is not mucked up
842 self.check_invariant(cond)
843
844 # start some more threads/processes
845 for i in range(3):
846 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000847 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000848 p.start()
849
850 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000851 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000852 t.start()
853
854 # wait for them to all sleep
855 for i in range(6):
856 sleeping.acquire()
857
858 # check no process/thread has woken up
859 time.sleep(DELTA)
860 self.assertReturnsIfImplemented(0, get_value, woken)
861
862 # wake them all up
863 cond.acquire()
864 cond.notify_all()
865 cond.release()
866
867 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200868 for i in range(10):
869 try:
870 if get_value(woken) == 6:
871 break
872 except NotImplementedError:
873 break
874 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000875 self.assertReturnsIfImplemented(6, get_value, woken)
876
877 # check state is not mucked up
878 self.check_invariant(cond)
879
880 def test_timeout(self):
881 cond = self.Condition()
882 wait = TimingWrapper(cond.wait)
883 cond.acquire()
884 res = wait(TIMEOUT1)
885 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000886 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000887 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
888
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200889 @classmethod
890 def _test_waitfor_f(cls, cond, state):
891 with cond:
892 state.value = 0
893 cond.notify()
894 result = cond.wait_for(lambda : state.value==4)
895 if not result or state.value != 4:
896 sys.exit(1)
897
898 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
899 def test_waitfor(self):
900 # based on test in test/lock_tests.py
901 cond = self.Condition()
902 state = self.Value('i', -1)
903
904 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
905 p.daemon = True
906 p.start()
907
908 with cond:
909 result = cond.wait_for(lambda : state.value==0)
910 self.assertTrue(result)
911 self.assertEqual(state.value, 0)
912
913 for i in range(4):
914 time.sleep(0.01)
915 with cond:
916 state.value += 1
917 cond.notify()
918
919 p.join(5)
920 self.assertFalse(p.is_alive())
921 self.assertEqual(p.exitcode, 0)
922
923 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100924 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
925 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200926 with cond:
927 expected = 0.1
928 dt = time.time()
929 result = cond.wait_for(lambda : state.value==4, timeout=expected)
930 dt = time.time() - dt
931 # borrow logic in assertTimeout() from test/lock_tests.py
932 if not result and expected * 0.6 < dt < expected * 10.0:
933 success.value = True
934
935 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
936 def test_waitfor_timeout(self):
937 # based on test in test/lock_tests.py
938 cond = self.Condition()
939 state = self.Value('i', 0)
940 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100941 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200942
943 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100944 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200945 p.daemon = True
946 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100947 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200948
949 # Only increment 3 times, so state == 4 is never reached.
950 for i in range(3):
951 time.sleep(0.01)
952 with cond:
953 state.value += 1
954 cond.notify()
955
956 p.join(5)
957 self.assertTrue(success.value)
958
Richard Oudkerk98449932012-06-05 13:15:29 +0100959 @classmethod
960 def _test_wait_result(cls, c, pid):
961 with c:
962 c.notify()
963 time.sleep(1)
964 if pid is not None:
965 os.kill(pid, signal.SIGINT)
966
967 def test_wait_result(self):
968 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
969 pid = os.getpid()
970 else:
971 pid = None
972
973 c = self.Condition()
974 with c:
975 self.assertFalse(c.wait(0))
976 self.assertFalse(c.wait(0.1))
977
978 p = self.Process(target=self._test_wait_result, args=(c, pid))
979 p.start()
980
981 self.assertTrue(c.wait(10))
982 if pid is not None:
983 self.assertRaises(KeyboardInterrupt, c.wait, 10)
984
985 p.join()
986
Benjamin Petersone711caf2008-06-11 16:44:04 +0000987
988class _TestEvent(BaseTestCase):
989
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000990 @classmethod
991 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000992 time.sleep(TIMEOUT2)
993 event.set()
994
995 def test_event(self):
996 event = self.Event()
997 wait = TimingWrapper(event.wait)
998
Ezio Melotti13925002011-03-16 11:05:33 +0200999 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001001 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001002
Benjamin Peterson965ce872009-04-05 21:24:58 +00001003 # Removed, threading.Event.wait() will return the value of the __flag
1004 # instead of None. API Shear with the semaphore backed mp.Event
1005 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001006 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001007 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001008 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1009
1010 event.set()
1011
1012 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001013 self.assertEqual(event.is_set(), True)
1014 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001016 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001017 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1018 # self.assertEqual(event.is_set(), True)
1019
1020 event.clear()
1021
1022 #self.assertEqual(event.is_set(), False)
1023
Jesus Cea94f964f2011-09-09 20:26:57 +02001024 p = self.Process(target=self._test_event, args=(event,))
1025 p.daemon = True
1026 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001027 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001028
1029#
1030#
1031#
1032
1033class _TestValue(BaseTestCase):
1034
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001035 ALLOWED_TYPES = ('processes',)
1036
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037 codes_values = [
1038 ('i', 4343, 24234),
1039 ('d', 3.625, -4.25),
1040 ('h', -232, 234),
1041 ('c', latin('x'), latin('y'))
1042 ]
1043
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001044 def setUp(self):
1045 if not HAS_SHAREDCTYPES:
1046 self.skipTest("requires multiprocessing.sharedctypes")
1047
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001048 @classmethod
1049 def _test(cls, values):
1050 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001051 sv.value = cv[2]
1052
1053
1054 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001055 if raw:
1056 values = [self.RawValue(code, value)
1057 for code, value, _ in self.codes_values]
1058 else:
1059 values = [self.Value(code, value)
1060 for code, value, _ in self.codes_values]
1061
1062 for sv, cv in zip(values, self.codes_values):
1063 self.assertEqual(sv.value, cv[1])
1064
1065 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001066 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067 proc.start()
1068 proc.join()
1069
1070 for sv, cv in zip(values, self.codes_values):
1071 self.assertEqual(sv.value, cv[2])
1072
1073 def test_rawvalue(self):
1074 self.test_value(raw=True)
1075
1076 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001077 val1 = self.Value('i', 5)
1078 lock1 = val1.get_lock()
1079 obj1 = val1.get_obj()
1080
1081 val2 = self.Value('i', 5, lock=None)
1082 lock2 = val2.get_lock()
1083 obj2 = val2.get_obj()
1084
1085 lock = self.Lock()
1086 val3 = self.Value('i', 5, lock=lock)
1087 lock3 = val3.get_lock()
1088 obj3 = val3.get_obj()
1089 self.assertEqual(lock, lock3)
1090
Jesse Nollerb0516a62009-01-18 03:11:38 +00001091 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001092 self.assertFalse(hasattr(arr4, 'get_lock'))
1093 self.assertFalse(hasattr(arr4, 'get_obj'))
1094
Jesse Nollerb0516a62009-01-18 03:11:38 +00001095 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1096
1097 arr5 = self.RawValue('i', 5)
1098 self.assertFalse(hasattr(arr5, 'get_lock'))
1099 self.assertFalse(hasattr(arr5, 'get_obj'))
1100
Benjamin Petersone711caf2008-06-11 16:44:04 +00001101
1102class _TestArray(BaseTestCase):
1103
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001104 ALLOWED_TYPES = ('processes',)
1105
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001106 @classmethod
1107 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001108 for i in range(1, len(seq)):
1109 seq[i] += seq[i-1]
1110
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001111 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001112 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001113 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1114 if raw:
1115 arr = self.RawArray('i', seq)
1116 else:
1117 arr = self.Array('i', seq)
1118
1119 self.assertEqual(len(arr), len(seq))
1120 self.assertEqual(arr[3], seq[3])
1121 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1122
1123 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1124
1125 self.assertEqual(list(arr[:]), seq)
1126
1127 self.f(seq)
1128
1129 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001130 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131 p.start()
1132 p.join()
1133
1134 self.assertEqual(list(arr[:]), seq)
1135
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001136 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001137 def test_array_from_size(self):
1138 size = 10
1139 # Test for zeroing (see issue #11675).
1140 # The repetition below strengthens the test by increasing the chances
1141 # of previously allocated non-zero memory being used for the new array
1142 # on the 2nd and 3rd loops.
1143 for _ in range(3):
1144 arr = self.Array('i', size)
1145 self.assertEqual(len(arr), size)
1146 self.assertEqual(list(arr), [0] * size)
1147 arr[:] = range(10)
1148 self.assertEqual(list(arr), list(range(10)))
1149 del arr
1150
1151 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001152 def test_rawarray(self):
1153 self.test_array(raw=True)
1154
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001155 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001156 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001157 arr1 = self.Array('i', list(range(10)))
1158 lock1 = arr1.get_lock()
1159 obj1 = arr1.get_obj()
1160
1161 arr2 = self.Array('i', list(range(10)), lock=None)
1162 lock2 = arr2.get_lock()
1163 obj2 = arr2.get_obj()
1164
1165 lock = self.Lock()
1166 arr3 = self.Array('i', list(range(10)), lock=lock)
1167 lock3 = arr3.get_lock()
1168 obj3 = arr3.get_obj()
1169 self.assertEqual(lock, lock3)
1170
Jesse Nollerb0516a62009-01-18 03:11:38 +00001171 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001172 self.assertFalse(hasattr(arr4, 'get_lock'))
1173 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001174 self.assertRaises(AttributeError,
1175 self.Array, 'i', range(10), lock='notalock')
1176
1177 arr5 = self.RawArray('i', range(10))
1178 self.assertFalse(hasattr(arr5, 'get_lock'))
1179 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001180
1181#
1182#
1183#
1184
1185class _TestContainers(BaseTestCase):
1186
1187 ALLOWED_TYPES = ('manager',)
1188
1189 def test_list(self):
1190 a = self.list(list(range(10)))
1191 self.assertEqual(a[:], list(range(10)))
1192
1193 b = self.list()
1194 self.assertEqual(b[:], [])
1195
1196 b.extend(list(range(5)))
1197 self.assertEqual(b[:], list(range(5)))
1198
1199 self.assertEqual(b[2], 2)
1200 self.assertEqual(b[2:10], [2,3,4])
1201
1202 b *= 2
1203 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1204
1205 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1206
1207 self.assertEqual(a[:], list(range(10)))
1208
1209 d = [a, b]
1210 e = self.list(d)
1211 self.assertEqual(
1212 e[:],
1213 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1214 )
1215
1216 f = self.list([a])
1217 a.append('hello')
1218 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1219
1220 def test_dict(self):
1221 d = self.dict()
1222 indices = list(range(65, 70))
1223 for i in indices:
1224 d[i] = chr(i)
1225 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1226 self.assertEqual(sorted(d.keys()), indices)
1227 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1228 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1229
1230 def test_namespace(self):
1231 n = self.Namespace()
1232 n.name = 'Bob'
1233 n.job = 'Builder'
1234 n._hidden = 'hidden'
1235 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1236 del n.job
1237 self.assertEqual(str(n), "Namespace(name='Bob')")
1238 self.assertTrue(hasattr(n, 'name'))
1239 self.assertTrue(not hasattr(n, 'job'))
1240
1241#
1242#
1243#
1244
1245def sqr(x, wait=0.0):
1246 time.sleep(wait)
1247 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001248
Antoine Pitroude911b22011-12-21 11:03:24 +01001249def mul(x, y):
1250 return x*y
1251
Benjamin Petersone711caf2008-06-11 16:44:04 +00001252class _TestPool(BaseTestCase):
1253
1254 def test_apply(self):
1255 papply = self.pool.apply
1256 self.assertEqual(papply(sqr, (5,)), sqr(5))
1257 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1258
1259 def test_map(self):
1260 pmap = self.pool.map
1261 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1262 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1263 list(map(sqr, list(range(100)))))
1264
Antoine Pitroude911b22011-12-21 11:03:24 +01001265 def test_starmap(self):
1266 psmap = self.pool.starmap
1267 tuples = list(zip(range(10), range(9,-1, -1)))
1268 self.assertEqual(psmap(mul, tuples),
1269 list(itertools.starmap(mul, tuples)))
1270 tuples = list(zip(range(100), range(99,-1, -1)))
1271 self.assertEqual(psmap(mul, tuples, chunksize=20),
1272 list(itertools.starmap(mul, tuples)))
1273
1274 def test_starmap_async(self):
1275 tuples = list(zip(range(100), range(99,-1, -1)))
1276 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1277 list(itertools.starmap(mul, tuples)))
1278
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001279 def test_map_chunksize(self):
1280 try:
1281 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1282 except multiprocessing.TimeoutError:
1283 self.fail("pool.map_async with chunksize stalled on null list")
1284
Benjamin Petersone711caf2008-06-11 16:44:04 +00001285 def test_async(self):
1286 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1287 get = TimingWrapper(res.get)
1288 self.assertEqual(get(), 49)
1289 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1290
1291 def test_async_timeout(self):
1292 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1293 get = TimingWrapper(res.get)
1294 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1295 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1296
1297 def test_imap(self):
1298 it = self.pool.imap(sqr, list(range(10)))
1299 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1300
1301 it = self.pool.imap(sqr, list(range(10)))
1302 for i in range(10):
1303 self.assertEqual(next(it), i*i)
1304 self.assertRaises(StopIteration, it.__next__)
1305
1306 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1307 for i in range(1000):
1308 self.assertEqual(next(it), i*i)
1309 self.assertRaises(StopIteration, it.__next__)
1310
1311 def test_imap_unordered(self):
1312 it = self.pool.imap_unordered(sqr, list(range(1000)))
1313 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1314
1315 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1316 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1317
1318 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001319 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1320 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1321
Benjamin Petersone711caf2008-06-11 16:44:04 +00001322 p = multiprocessing.Pool(3)
1323 self.assertEqual(3, len(p._pool))
1324 p.close()
1325 p.join()
1326
1327 def test_terminate(self):
1328 if self.TYPE == 'manager':
1329 # On Unix a forked process increfs each shared object to
1330 # which its parent process held a reference. If the
1331 # forked process gets terminated then there is likely to
1332 # be a reference leak. So to prevent
1333 # _TestZZZNumberOfObjects from failing we skip this test
1334 # when using a manager.
1335 return
1336
1337 result = self.pool.map_async(
1338 time.sleep, [0.1 for i in range(10000)], chunksize=1
1339 )
1340 self.pool.terminate()
1341 join = TimingWrapper(self.pool.join)
1342 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001343 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001344
Ask Solem2afcbf22010-11-09 20:55:52 +00001345def raising():
1346 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001347
Ask Solem2afcbf22010-11-09 20:55:52 +00001348def unpickleable_result():
1349 return lambda: 42
1350
1351class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001352 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001353
1354 def test_async_error_callback(self):
1355 p = multiprocessing.Pool(2)
1356
1357 scratchpad = [None]
1358 def errback(exc):
1359 scratchpad[0] = exc
1360
1361 res = p.apply_async(raising, error_callback=errback)
1362 self.assertRaises(KeyError, res.get)
1363 self.assertTrue(scratchpad[0])
1364 self.assertIsInstance(scratchpad[0], KeyError)
1365
1366 p.close()
1367 p.join()
1368
1369 def test_unpickleable_result(self):
1370 from multiprocessing.pool import MaybeEncodingError
1371 p = multiprocessing.Pool(2)
1372
1373 # Make sure we don't lose pool processes because of encoding errors.
1374 for iteration in range(20):
1375
1376 scratchpad = [None]
1377 def errback(exc):
1378 scratchpad[0] = exc
1379
1380 res = p.apply_async(unpickleable_result, error_callback=errback)
1381 self.assertRaises(MaybeEncodingError, res.get)
1382 wrapped = scratchpad[0]
1383 self.assertTrue(wrapped)
1384 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1385 self.assertIsNotNone(wrapped.exc)
1386 self.assertIsNotNone(wrapped.value)
1387
1388 p.close()
1389 p.join()
1390
1391class _TestPoolWorkerLifetime(BaseTestCase):
1392 ALLOWED_TYPES = ('processes', )
1393
Jesse Noller1f0b6582010-01-27 03:36:01 +00001394 def test_pool_worker_lifetime(self):
1395 p = multiprocessing.Pool(3, maxtasksperchild=10)
1396 self.assertEqual(3, len(p._pool))
1397 origworkerpids = [w.pid for w in p._pool]
1398 # Run many tasks so each worker gets replaced (hopefully)
1399 results = []
1400 for i in range(100):
1401 results.append(p.apply_async(sqr, (i, )))
1402 # Fetch the results and verify we got the right answers,
1403 # also ensuring all the tasks have completed.
1404 for (j, res) in enumerate(results):
1405 self.assertEqual(res.get(), sqr(j))
1406 # Refill the pool
1407 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001408 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001409 # (countdown * DELTA = 5 seconds max startup process time)
1410 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001411 while countdown and not all(w.is_alive() for w in p._pool):
1412 countdown -= 1
1413 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001414 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001415 # All pids should be assigned. See issue #7805.
1416 self.assertNotIn(None, origworkerpids)
1417 self.assertNotIn(None, finalworkerpids)
1418 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001419 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1420 p.close()
1421 p.join()
1422
Charles-François Natalif8859e12011-10-24 18:45:29 +02001423 def test_pool_worker_lifetime_early_close(self):
1424 # Issue #10332: closing a pool whose workers have limited lifetimes
1425 # before all the tasks completed would make join() hang.
1426 p = multiprocessing.Pool(3, maxtasksperchild=1)
1427 results = []
1428 for i in range(6):
1429 results.append(p.apply_async(sqr, (i, 0.3)))
1430 p.close()
1431 p.join()
1432 # check the results
1433 for (j, res) in enumerate(results):
1434 self.assertEqual(res.get(), sqr(j))
1435
1436
Benjamin Petersone711caf2008-06-11 16:44:04 +00001437#
1438# Test that manager has expected number of shared objects left
1439#
1440
1441class _TestZZZNumberOfObjects(BaseTestCase):
1442 # Because test cases are sorted alphabetically, this one will get
1443 # run after all the other tests for the manager. It tests that
1444 # there have been no "reference leaks" for the manager's shared
1445 # objects. Note the comment in _TestPool.test_terminate().
1446 ALLOWED_TYPES = ('manager',)
1447
1448 def test_number_of_objects(self):
1449 EXPECTED_NUMBER = 1 # the pool object is still alive
1450 multiprocessing.active_children() # discard dead process objs
1451 gc.collect() # do garbage collection
1452 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001453 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001454 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001455 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001456 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001457
1458 self.assertEqual(refs, EXPECTED_NUMBER)
1459
1460#
1461# Test of creating a customized manager class
1462#
1463
1464from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1465
1466class FooBar(object):
1467 def f(self):
1468 return 'f()'
1469 def g(self):
1470 raise ValueError
1471 def _h(self):
1472 return '_h()'
1473
1474def baz():
1475 for i in range(10):
1476 yield i*i
1477
1478class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001479 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001480 def __iter__(self):
1481 return self
1482 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001483 return self._callmethod('__next__')
1484
1485class MyManager(BaseManager):
1486 pass
1487
1488MyManager.register('Foo', callable=FooBar)
1489MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1490MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1491
1492
1493class _TestMyManager(BaseTestCase):
1494
1495 ALLOWED_TYPES = ('manager',)
1496
1497 def test_mymanager(self):
1498 manager = MyManager()
1499 manager.start()
1500
1501 foo = manager.Foo()
1502 bar = manager.Bar()
1503 baz = manager.baz()
1504
1505 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1506 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1507
1508 self.assertEqual(foo_methods, ['f', 'g'])
1509 self.assertEqual(bar_methods, ['f', '_h'])
1510
1511 self.assertEqual(foo.f(), 'f()')
1512 self.assertRaises(ValueError, foo.g)
1513 self.assertEqual(foo._callmethod('f'), 'f()')
1514 self.assertRaises(RemoteError, foo._callmethod, '_h')
1515
1516 self.assertEqual(bar.f(), 'f()')
1517 self.assertEqual(bar._h(), '_h()')
1518 self.assertEqual(bar._callmethod('f'), 'f()')
1519 self.assertEqual(bar._callmethod('_h'), '_h()')
1520
1521 self.assertEqual(list(baz), [i*i for i in range(10)])
1522
1523 manager.shutdown()
1524
1525#
1526# Test of connecting to a remote server and using xmlrpclib for serialization
1527#
1528
1529_queue = pyqueue.Queue()
1530def get_queue():
1531 return _queue
1532
1533class QueueManager(BaseManager):
1534 '''manager class used by server process'''
1535QueueManager.register('get_queue', callable=get_queue)
1536
1537class QueueManager2(BaseManager):
1538 '''manager class which specifies the same interface as QueueManager'''
1539QueueManager2.register('get_queue')
1540
1541
1542SERIALIZER = 'xmlrpclib'
1543
1544class _TestRemoteManager(BaseTestCase):
1545
1546 ALLOWED_TYPES = ('manager',)
1547
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001548 @classmethod
1549 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001550 manager = QueueManager2(
1551 address=address, authkey=authkey, serializer=SERIALIZER
1552 )
1553 manager.connect()
1554 queue = manager.get_queue()
1555 queue.put(('hello world', None, True, 2.25))
1556
1557 def test_remote(self):
1558 authkey = os.urandom(32)
1559
1560 manager = QueueManager(
1561 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1562 )
1563 manager.start()
1564
1565 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001566 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001567 p.start()
1568
1569 manager2 = QueueManager2(
1570 address=manager.address, authkey=authkey, serializer=SERIALIZER
1571 )
1572 manager2.connect()
1573 queue = manager2.get_queue()
1574
1575 # Note that xmlrpclib will deserialize object as a list not a tuple
1576 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1577
1578 # Because we are using xmlrpclib for serialization instead of
1579 # pickle this will cause a serialization error.
1580 self.assertRaises(Exception, queue.put, time.sleep)
1581
1582 # Make queue finalizer run before the server is stopped
1583 del queue
1584 manager.shutdown()
1585
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001586class _TestManagerRestart(BaseTestCase):
1587
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001588 @classmethod
1589 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001590 manager = QueueManager(
1591 address=address, authkey=authkey, serializer=SERIALIZER)
1592 manager.connect()
1593 queue = manager.get_queue()
1594 queue.put('hello world')
1595
1596 def test_rapid_restart(self):
1597 authkey = os.urandom(32)
1598 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001599 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001600 srvr = manager.get_server()
1601 addr = srvr.address
1602 # Close the connection.Listener socket which gets opened as a part
1603 # of manager.get_server(). It's not needed for the test.
1604 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001605 manager.start()
1606
1607 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001608 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001609 p.start()
1610 queue = manager.get_queue()
1611 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001612 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001613 manager.shutdown()
1614 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001615 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001616 try:
1617 manager.start()
1618 except IOError as e:
1619 if e.errno != errno.EADDRINUSE:
1620 raise
1621 # Retry after some time, in case the old socket was lingering
1622 # (sporadic failure on buildbots)
1623 time.sleep(1.0)
1624 manager = QueueManager(
1625 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001626 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001627
Benjamin Petersone711caf2008-06-11 16:44:04 +00001628#
1629#
1630#
1631
1632SENTINEL = latin('')
1633
1634class _TestConnection(BaseTestCase):
1635
1636 ALLOWED_TYPES = ('processes', 'threads')
1637
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001638 @classmethod
1639 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001640 for msg in iter(conn.recv_bytes, SENTINEL):
1641 conn.send_bytes(msg)
1642 conn.close()
1643
1644 def test_connection(self):
1645 conn, child_conn = self.Pipe()
1646
1647 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001648 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001649 p.start()
1650
1651 seq = [1, 2.25, None]
1652 msg = latin('hello world')
1653 longmsg = msg * 10
1654 arr = array.array('i', list(range(4)))
1655
1656 if self.TYPE == 'processes':
1657 self.assertEqual(type(conn.fileno()), int)
1658
1659 self.assertEqual(conn.send(seq), None)
1660 self.assertEqual(conn.recv(), seq)
1661
1662 self.assertEqual(conn.send_bytes(msg), None)
1663 self.assertEqual(conn.recv_bytes(), msg)
1664
1665 if self.TYPE == 'processes':
1666 buffer = array.array('i', [0]*10)
1667 expected = list(arr) + [0] * (10 - len(arr))
1668 self.assertEqual(conn.send_bytes(arr), None)
1669 self.assertEqual(conn.recv_bytes_into(buffer),
1670 len(arr) * buffer.itemsize)
1671 self.assertEqual(list(buffer), expected)
1672
1673 buffer = array.array('i', [0]*10)
1674 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1675 self.assertEqual(conn.send_bytes(arr), None)
1676 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1677 len(arr) * buffer.itemsize)
1678 self.assertEqual(list(buffer), expected)
1679
1680 buffer = bytearray(latin(' ' * 40))
1681 self.assertEqual(conn.send_bytes(longmsg), None)
1682 try:
1683 res = conn.recv_bytes_into(buffer)
1684 except multiprocessing.BufferTooShort as e:
1685 self.assertEqual(e.args, (longmsg,))
1686 else:
1687 self.fail('expected BufferTooShort, got %s' % res)
1688
1689 poll = TimingWrapper(conn.poll)
1690
1691 self.assertEqual(poll(), False)
1692 self.assertTimingAlmostEqual(poll.elapsed, 0)
1693
Richard Oudkerk59d54042012-05-10 16:11:12 +01001694 self.assertEqual(poll(-1), False)
1695 self.assertTimingAlmostEqual(poll.elapsed, 0)
1696
Benjamin Petersone711caf2008-06-11 16:44:04 +00001697 self.assertEqual(poll(TIMEOUT1), False)
1698 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1699
1700 conn.send(None)
1701
1702 self.assertEqual(poll(TIMEOUT1), True)
1703 self.assertTimingAlmostEqual(poll.elapsed, 0)
1704
1705 self.assertEqual(conn.recv(), None)
1706
1707 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1708 conn.send_bytes(really_big_msg)
1709 self.assertEqual(conn.recv_bytes(), really_big_msg)
1710
1711 conn.send_bytes(SENTINEL) # tell child to quit
1712 child_conn.close()
1713
1714 if self.TYPE == 'processes':
1715 self.assertEqual(conn.readable, True)
1716 self.assertEqual(conn.writable, True)
1717 self.assertRaises(EOFError, conn.recv)
1718 self.assertRaises(EOFError, conn.recv_bytes)
1719
1720 p.join()
1721
1722 def test_duplex_false(self):
1723 reader, writer = self.Pipe(duplex=False)
1724 self.assertEqual(writer.send(1), None)
1725 self.assertEqual(reader.recv(), 1)
1726 if self.TYPE == 'processes':
1727 self.assertEqual(reader.readable, True)
1728 self.assertEqual(reader.writable, False)
1729 self.assertEqual(writer.readable, False)
1730 self.assertEqual(writer.writable, True)
1731 self.assertRaises(IOError, reader.send, 2)
1732 self.assertRaises(IOError, writer.recv)
1733 self.assertRaises(IOError, writer.poll)
1734
1735 def test_spawn_close(self):
1736 # We test that a pipe connection can be closed by parent
1737 # process immediately after child is spawned. On Windows this
1738 # would have sometimes failed on old versions because
1739 # child_conn would be closed before the child got a chance to
1740 # duplicate it.
1741 conn, child_conn = self.Pipe()
1742
1743 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001744 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001745 p.start()
1746 child_conn.close() # this might complete before child initializes
1747
1748 msg = latin('hello')
1749 conn.send_bytes(msg)
1750 self.assertEqual(conn.recv_bytes(), msg)
1751
1752 conn.send_bytes(SENTINEL)
1753 conn.close()
1754 p.join()
1755
1756 def test_sendbytes(self):
1757 if self.TYPE != 'processes':
1758 return
1759
1760 msg = latin('abcdefghijklmnopqrstuvwxyz')
1761 a, b = self.Pipe()
1762
1763 a.send_bytes(msg)
1764 self.assertEqual(b.recv_bytes(), msg)
1765
1766 a.send_bytes(msg, 5)
1767 self.assertEqual(b.recv_bytes(), msg[5:])
1768
1769 a.send_bytes(msg, 7, 8)
1770 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1771
1772 a.send_bytes(msg, 26)
1773 self.assertEqual(b.recv_bytes(), latin(''))
1774
1775 a.send_bytes(msg, 26, 0)
1776 self.assertEqual(b.recv_bytes(), latin(''))
1777
1778 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1779
1780 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1781
1782 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1783
1784 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1785
1786 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1787
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001788 @classmethod
1789 def _is_fd_assigned(cls, fd):
1790 try:
1791 os.fstat(fd)
1792 except OSError as e:
1793 if e.errno == errno.EBADF:
1794 return False
1795 raise
1796 else:
1797 return True
1798
1799 @classmethod
1800 def _writefd(cls, conn, data, create_dummy_fds=False):
1801 if create_dummy_fds:
1802 for i in range(0, 256):
1803 if not cls._is_fd_assigned(i):
1804 os.dup2(conn.fileno(), i)
1805 fd = reduction.recv_handle(conn)
1806 if msvcrt:
1807 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1808 os.write(fd, data)
1809 os.close(fd)
1810
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001811 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001812 def test_fd_transfer(self):
1813 if self.TYPE != 'processes':
1814 self.skipTest("only makes sense with processes")
1815 conn, child_conn = self.Pipe(duplex=True)
1816
1817 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001818 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001819 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001820 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001821 with open(test.support.TESTFN, "wb") as f:
1822 fd = f.fileno()
1823 if msvcrt:
1824 fd = msvcrt.get_osfhandle(fd)
1825 reduction.send_handle(conn, fd, p.pid)
1826 p.join()
1827 with open(test.support.TESTFN, "rb") as f:
1828 self.assertEqual(f.read(), b"foo")
1829
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001830 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001831 @unittest.skipIf(sys.platform == "win32",
1832 "test semantics don't make sense on Windows")
1833 @unittest.skipIf(MAXFD <= 256,
1834 "largest assignable fd number is too small")
1835 @unittest.skipUnless(hasattr(os, "dup2"),
1836 "test needs os.dup2()")
1837 def test_large_fd_transfer(self):
1838 # With fd > 256 (issue #11657)
1839 if self.TYPE != 'processes':
1840 self.skipTest("only makes sense with processes")
1841 conn, child_conn = self.Pipe(duplex=True)
1842
1843 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001844 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001845 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001846 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001847 with open(test.support.TESTFN, "wb") as f:
1848 fd = f.fileno()
1849 for newfd in range(256, MAXFD):
1850 if not self._is_fd_assigned(newfd):
1851 break
1852 else:
1853 self.fail("could not find an unassigned large file descriptor")
1854 os.dup2(fd, newfd)
1855 try:
1856 reduction.send_handle(conn, newfd, p.pid)
1857 finally:
1858 os.close(newfd)
1859 p.join()
1860 with open(test.support.TESTFN, "rb") as f:
1861 self.assertEqual(f.read(), b"bar")
1862
Jesus Cea4507e642011-09-21 03:53:25 +02001863 @classmethod
1864 def _send_data_without_fd(self, conn):
1865 os.write(conn.fileno(), b"\0")
1866
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001867 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001868 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1869 def test_missing_fd_transfer(self):
1870 # Check that exception is raised when received data is not
1871 # accompanied by a file descriptor in ancillary data.
1872 if self.TYPE != 'processes':
1873 self.skipTest("only makes sense with processes")
1874 conn, child_conn = self.Pipe(duplex=True)
1875
1876 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1877 p.daemon = True
1878 p.start()
1879 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1880 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001881
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001882class _TestListener(BaseTestCase):
1883
1884 ALLOWED_TYPES = ('processes')
1885
1886 def test_multiple_bind(self):
1887 for family in self.connection.families:
1888 l = self.connection.Listener(family=family)
1889 self.addCleanup(l.close)
1890 self.assertRaises(OSError, self.connection.Listener,
1891 l.address, family)
1892
Benjamin Petersone711caf2008-06-11 16:44:04 +00001893class _TestListenerClient(BaseTestCase):
1894
1895 ALLOWED_TYPES = ('processes', 'threads')
1896
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001897 @classmethod
1898 def _test(cls, address):
1899 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001900 conn.send('hello')
1901 conn.close()
1902
1903 def test_listener_client(self):
1904 for family in self.connection.families:
1905 l = self.connection.Listener(family=family)
1906 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001907 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001908 p.start()
1909 conn = l.accept()
1910 self.assertEqual(conn.recv(), 'hello')
1911 p.join()
1912 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001913
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01001914 def test_issue14725(self):
1915 l = self.connection.Listener()
1916 p = self.Process(target=self._test, args=(l.address,))
1917 p.daemon = True
1918 p.start()
1919 time.sleep(1)
1920 # On Windows the client process should by now have connected,
1921 # written data and closed the pipe handle by now. This causes
1922 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1923 # 14725.
1924 conn = l.accept()
1925 self.assertEqual(conn.recv(), 'hello')
1926 conn.close()
1927 p.join()
1928 l.close()
1929
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01001930class _TestPoll(unittest.TestCase):
1931
1932 ALLOWED_TYPES = ('processes', 'threads')
1933
1934 def test_empty_string(self):
1935 a, b = self.Pipe()
1936 self.assertEqual(a.poll(), False)
1937 b.send_bytes(b'')
1938 self.assertEqual(a.poll(), True)
1939 self.assertEqual(a.poll(), True)
1940
1941 @classmethod
1942 def _child_strings(cls, conn, strings):
1943 for s in strings:
1944 time.sleep(0.1)
1945 conn.send_bytes(s)
1946 conn.close()
1947
1948 def test_strings(self):
1949 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
1950 a, b = self.Pipe()
1951 p = self.Process(target=self._child_strings, args=(b, strings))
1952 p.start()
1953
1954 for s in strings:
1955 for i in range(200):
1956 if a.poll(0.01):
1957 break
1958 x = a.recv_bytes()
1959 self.assertEqual(s, x)
1960
1961 p.join()
1962
1963 @classmethod
1964 def _child_boundaries(cls, r):
1965 # Polling may "pull" a message in to the child process, but we
1966 # don't want it to pull only part of a message, as that would
1967 # corrupt the pipe for any other processes which might later
1968 # read from it.
1969 r.poll(5)
1970
1971 def test_boundaries(self):
1972 r, w = self.Pipe(False)
1973 p = self.Process(target=self._child_boundaries, args=(r,))
1974 p.start()
1975 time.sleep(2)
1976 L = [b"first", b"second"]
1977 for obj in L:
1978 w.send_bytes(obj)
1979 w.close()
1980 p.join()
1981 self.assertIn(r.recv_bytes(), L)
1982
1983 @classmethod
1984 def _child_dont_merge(cls, b):
1985 b.send_bytes(b'a')
1986 b.send_bytes(b'b')
1987 b.send_bytes(b'cd')
1988
1989 def test_dont_merge(self):
1990 a, b = self.Pipe()
1991 self.assertEqual(a.poll(0.0), False)
1992 self.assertEqual(a.poll(0.1), False)
1993
1994 p = self.Process(target=self._child_dont_merge, args=(b,))
1995 p.start()
1996
1997 self.assertEqual(a.recv_bytes(), b'a')
1998 self.assertEqual(a.poll(1.0), True)
1999 self.assertEqual(a.poll(1.0), True)
2000 self.assertEqual(a.recv_bytes(), b'b')
2001 self.assertEqual(a.poll(1.0), True)
2002 self.assertEqual(a.poll(1.0), True)
2003 self.assertEqual(a.poll(0.0), True)
2004 self.assertEqual(a.recv_bytes(), b'cd')
2005
2006 p.join()
2007
Benjamin Petersone711caf2008-06-11 16:44:04 +00002008#
2009# Test of sending connection and socket objects between processes
2010#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002011
Richard Oudkerk24524192012-04-30 14:48:51 +01002012# Intermittent fails on Mac OS X -- see Issue14669 and Issue12958
2013@unittest.skipIf(sys.platform == "darwin", "fd passing unreliable on Mac OS X")
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002014@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002015class _TestPicklingConnections(BaseTestCase):
2016
2017 ALLOWED_TYPES = ('processes',)
2018
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002019 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002020 def tearDownClass(cls):
2021 from multiprocessing.reduction import resource_sharer
2022 resource_sharer.stop(timeout=5)
2023
2024 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002025 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002026 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002027 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002028 conn.send(l.address)
2029 new_conn = l.accept()
2030 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002031 new_conn.close()
2032 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002033
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002034 l = socket.socket()
2035 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002036 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002037 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002038 new_conn, addr = l.accept()
2039 conn.send(new_conn)
2040 new_conn.close()
2041 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002042
2043 conn.recv()
2044
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002045 @classmethod
2046 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002047 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002048 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002049 client.send(msg.upper())
2050 client.close()
2051
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002052 address, msg = conn.recv()
2053 client = socket.socket()
2054 client.connect(address)
2055 client.sendall(msg.upper())
2056 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002057
2058 conn.close()
2059
2060 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002061 families = self.connection.families
2062
2063 lconn, lconn0 = self.Pipe()
2064 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002065 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002066 lp.start()
2067 lconn0.close()
2068
2069 rconn, rconn0 = self.Pipe()
2070 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002071 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002072 rp.start()
2073 rconn0.close()
2074
2075 for fam in families:
2076 msg = ('This connection uses family %s' % fam).encode('ascii')
2077 address = lconn.recv()
2078 rconn.send((address, msg))
2079 new_conn = lconn.recv()
2080 self.assertEqual(new_conn.recv(), msg.upper())
2081
2082 rconn.send(None)
2083
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002084 msg = latin('This connection uses a normal socket')
2085 address = lconn.recv()
2086 rconn.send((address, msg))
2087 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002088 buf = []
2089 while True:
2090 s = new_conn.recv(100)
2091 if not s:
2092 break
2093 buf.append(s)
2094 buf = b''.join(buf)
2095 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002096 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002097
2098 lconn.send(None)
2099
2100 rconn.close()
2101 lconn.close()
2102
2103 lp.join()
2104 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002105
2106 @classmethod
2107 def child_access(cls, conn):
2108 w = conn.recv()
2109 w.send('all is well')
2110 w.close()
2111
2112 r = conn.recv()
2113 msg = r.recv()
2114 conn.send(msg*2)
2115
2116 conn.close()
2117
2118 def test_access(self):
2119 # On Windows, if we do not specify a destination pid when
2120 # using DupHandle then we need to be careful to use the
2121 # correct access flags for DuplicateHandle(), or else
2122 # DupHandle.detach() will raise PermissionError. For example,
2123 # for a read only pipe handle we should use
2124 # access=FILE_GENERIC_READ. (Unfortunately
2125 # DUPLICATE_SAME_ACCESS does not work.)
2126 conn, child_conn = self.Pipe()
2127 p = self.Process(target=self.child_access, args=(child_conn,))
2128 p.daemon = True
2129 p.start()
2130 child_conn.close()
2131
2132 r, w = self.Pipe(duplex=False)
2133 conn.send(w)
2134 w.close()
2135 self.assertEqual(r.recv(), 'all is well')
2136 r.close()
2137
2138 r, w = self.Pipe(duplex=False)
2139 conn.send(r)
2140 r.close()
2141 w.send('foobar')
2142 w.close()
2143 self.assertEqual(conn.recv(), 'foobar'*2)
2144
Benjamin Petersone711caf2008-06-11 16:44:04 +00002145#
2146#
2147#
2148
2149class _TestHeap(BaseTestCase):
2150
2151 ALLOWED_TYPES = ('processes',)
2152
2153 def test_heap(self):
2154 iterations = 5000
2155 maxblocks = 50
2156 blocks = []
2157
2158 # create and destroy lots of blocks of different sizes
2159 for i in range(iterations):
2160 size = int(random.lognormvariate(0, 1) * 1000)
2161 b = multiprocessing.heap.BufferWrapper(size)
2162 blocks.append(b)
2163 if len(blocks) > maxblocks:
2164 i = random.randrange(maxblocks)
2165 del blocks[i]
2166
2167 # get the heap object
2168 heap = multiprocessing.heap.BufferWrapper._heap
2169
2170 # verify the state of the heap
2171 all = []
2172 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002173 heap._lock.acquire()
2174 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002175 for L in list(heap._len_to_seq.values()):
2176 for arena, start, stop in L:
2177 all.append((heap._arenas.index(arena), start, stop,
2178 stop-start, 'free'))
2179 for arena, start, stop in heap._allocated_blocks:
2180 all.append((heap._arenas.index(arena), start, stop,
2181 stop-start, 'occupied'))
2182 occupied += (stop-start)
2183
2184 all.sort()
2185
2186 for i in range(len(all)-1):
2187 (arena, start, stop) = all[i][:3]
2188 (narena, nstart, nstop) = all[i+1][:3]
2189 self.assertTrue((arena != narena and nstart == 0) or
2190 (stop == nstart))
2191
Charles-François Natali778db492011-07-02 14:35:49 +02002192 def test_free_from_gc(self):
2193 # Check that freeing of blocks by the garbage collector doesn't deadlock
2194 # (issue #12352).
2195 # Make sure the GC is enabled, and set lower collection thresholds to
2196 # make collections more frequent (and increase the probability of
2197 # deadlock).
2198 if not gc.isenabled():
2199 gc.enable()
2200 self.addCleanup(gc.disable)
2201 thresholds = gc.get_threshold()
2202 self.addCleanup(gc.set_threshold, *thresholds)
2203 gc.set_threshold(10)
2204
2205 # perform numerous block allocations, with cyclic references to make
2206 # sure objects are collected asynchronously by the gc
2207 for i in range(5000):
2208 a = multiprocessing.heap.BufferWrapper(1)
2209 b = multiprocessing.heap.BufferWrapper(1)
2210 # circular references
2211 a.buddy = b
2212 b.buddy = a
2213
Benjamin Petersone711caf2008-06-11 16:44:04 +00002214#
2215#
2216#
2217
Benjamin Petersone711caf2008-06-11 16:44:04 +00002218class _Foo(Structure):
2219 _fields_ = [
2220 ('x', c_int),
2221 ('y', c_double)
2222 ]
2223
2224class _TestSharedCTypes(BaseTestCase):
2225
2226 ALLOWED_TYPES = ('processes',)
2227
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002228 def setUp(self):
2229 if not HAS_SHAREDCTYPES:
2230 self.skipTest("requires multiprocessing.sharedctypes")
2231
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002232 @classmethod
2233 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002234 x.value *= 2
2235 y.value *= 2
2236 foo.x *= 2
2237 foo.y *= 2
2238 string.value *= 2
2239 for i in range(len(arr)):
2240 arr[i] *= 2
2241
2242 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002243 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002244 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002245 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002246 arr = self.Array('d', list(range(10)), lock=lock)
2247 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002248 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002249
2250 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002251 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002252 p.start()
2253 p.join()
2254
2255 self.assertEqual(x.value, 14)
2256 self.assertAlmostEqual(y.value, 2.0/3.0)
2257 self.assertEqual(foo.x, 6)
2258 self.assertAlmostEqual(foo.y, 4.0)
2259 for i in range(10):
2260 self.assertAlmostEqual(arr[i], i*2)
2261 self.assertEqual(string.value, latin('hellohello'))
2262
2263 def test_synchronize(self):
2264 self.test_sharedctypes(lock=True)
2265
2266 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002267 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002268 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002269 foo.x = 0
2270 foo.y = 0
2271 self.assertEqual(bar.x, 2)
2272 self.assertAlmostEqual(bar.y, 5.0)
2273
2274#
2275#
2276#
2277
2278class _TestFinalize(BaseTestCase):
2279
2280 ALLOWED_TYPES = ('processes',)
2281
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002282 @classmethod
2283 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002284 class Foo(object):
2285 pass
2286
2287 a = Foo()
2288 util.Finalize(a, conn.send, args=('a',))
2289 del a # triggers callback for a
2290
2291 b = Foo()
2292 close_b = util.Finalize(b, conn.send, args=('b',))
2293 close_b() # triggers callback for b
2294 close_b() # does nothing because callback has already been called
2295 del b # does nothing because callback has already been called
2296
2297 c = Foo()
2298 util.Finalize(c, conn.send, args=('c',))
2299
2300 d10 = Foo()
2301 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2302
2303 d01 = Foo()
2304 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2305 d02 = Foo()
2306 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2307 d03 = Foo()
2308 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2309
2310 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2311
2312 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2313
Ezio Melotti13925002011-03-16 11:05:33 +02002314 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002315 # garbage collecting locals
2316 util._exit_function()
2317 conn.close()
2318 os._exit(0)
2319
2320 def test_finalize(self):
2321 conn, child_conn = self.Pipe()
2322
2323 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002324 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002325 p.start()
2326 p.join()
2327
2328 result = [obj for obj in iter(conn.recv, 'STOP')]
2329 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2330
2331#
2332# Test that from ... import * works for each module
2333#
2334
2335class _TestImportStar(BaseTestCase):
2336
2337 ALLOWED_TYPES = ('processes',)
2338
2339 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002340 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002341 'multiprocessing', 'multiprocessing.connection',
2342 'multiprocessing.heap', 'multiprocessing.managers',
2343 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002344 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002345 ]
2346
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002347 if HAS_REDUCTION:
2348 modules.append('multiprocessing.reduction')
2349
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002350 if c_int is not None:
2351 # This module requires _ctypes
2352 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002353
2354 for name in modules:
2355 __import__(name)
2356 mod = sys.modules[name]
2357
2358 for attr in getattr(mod, '__all__', ()):
2359 self.assertTrue(
2360 hasattr(mod, attr),
2361 '%r does not have attribute %r' % (mod, attr)
2362 )
2363
2364#
2365# Quick test that logging works -- does not test logging output
2366#
2367
2368class _TestLogging(BaseTestCase):
2369
2370 ALLOWED_TYPES = ('processes',)
2371
2372 def test_enable_logging(self):
2373 logger = multiprocessing.get_logger()
2374 logger.setLevel(util.SUBWARNING)
2375 self.assertTrue(logger is not None)
2376 logger.debug('this will not be printed')
2377 logger.info('nor will this')
2378 logger.setLevel(LOG_LEVEL)
2379
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002380 @classmethod
2381 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002382 logger = multiprocessing.get_logger()
2383 conn.send(logger.getEffectiveLevel())
2384
2385 def test_level(self):
2386 LEVEL1 = 32
2387 LEVEL2 = 37
2388
2389 logger = multiprocessing.get_logger()
2390 root_logger = logging.getLogger()
2391 root_level = root_logger.level
2392
2393 reader, writer = multiprocessing.Pipe(duplex=False)
2394
2395 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002396 p = self.Process(target=self._test_level, args=(writer,))
2397 p.daemon = True
2398 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002399 self.assertEqual(LEVEL1, reader.recv())
2400
2401 logger.setLevel(logging.NOTSET)
2402 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002403 p = self.Process(target=self._test_level, args=(writer,))
2404 p.daemon = True
2405 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002406 self.assertEqual(LEVEL2, reader.recv())
2407
2408 root_logger.setLevel(root_level)
2409 logger.setLevel(level=LOG_LEVEL)
2410
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002411
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002412# class _TestLoggingProcessName(BaseTestCase):
2413#
2414# def handle(self, record):
2415# assert record.processName == multiprocessing.current_process().name
2416# self.__handled = True
2417#
2418# def test_logging(self):
2419# handler = logging.Handler()
2420# handler.handle = self.handle
2421# self.__handled = False
2422# # Bypass getLogger() and side-effects
2423# logger = logging.getLoggerClass()(
2424# 'multiprocessing.test.TestLoggingProcessName')
2425# logger.addHandler(handler)
2426# logger.propagate = False
2427#
2428# logger.warn('foo')
2429# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002430
Benjamin Petersone711caf2008-06-11 16:44:04 +00002431#
Jesse Noller6214edd2009-01-19 16:23:53 +00002432# Test to verify handle verification, see issue 3321
2433#
2434
2435class TestInvalidHandle(unittest.TestCase):
2436
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002437 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002438 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002439 conn = multiprocessing.connection.Connection(44977608)
2440 try:
2441 self.assertRaises((ValueError, IOError), conn.poll)
2442 finally:
2443 # Hack private attribute _handle to avoid printing an error
2444 # in conn.__del__
2445 conn._handle = None
2446 self.assertRaises((ValueError, IOError),
2447 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002448
Jesse Noller6214edd2009-01-19 16:23:53 +00002449#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002450# Functions used to create test cases from the base ones in this module
2451#
2452
2453def get_attributes(Source, names):
2454 d = {}
2455 for name in names:
2456 obj = getattr(Source, name)
2457 if type(obj) == type(get_attributes):
2458 obj = staticmethod(obj)
2459 d[name] = obj
2460 return d
2461
2462def create_test_cases(Mixin, type):
2463 result = {}
2464 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002465 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002466
2467 for name in list(glob.keys()):
2468 if name.startswith('_Test'):
2469 base = glob[name]
2470 if type in base.ALLOWED_TYPES:
2471 newname = 'With' + Type + name[1:]
2472 class Temp(base, unittest.TestCase, Mixin):
2473 pass
2474 result[newname] = Temp
2475 Temp.__name__ = newname
2476 Temp.__module__ = Mixin.__module__
2477 return result
2478
2479#
2480# Create test cases
2481#
2482
2483class ProcessesMixin(object):
2484 TYPE = 'processes'
2485 Process = multiprocessing.Process
2486 locals().update(get_attributes(multiprocessing, (
2487 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2488 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2489 'RawArray', 'current_process', 'active_children', 'Pipe',
2490 'connection', 'JoinableQueue'
2491 )))
2492
2493testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2494globals().update(testcases_processes)
2495
2496
2497class ManagerMixin(object):
2498 TYPE = 'manager'
2499 Process = multiprocessing.Process
2500 manager = object.__new__(multiprocessing.managers.SyncManager)
2501 locals().update(get_attributes(manager, (
2502 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2503 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2504 'Namespace', 'JoinableQueue'
2505 )))
2506
2507testcases_manager = create_test_cases(ManagerMixin, type='manager')
2508globals().update(testcases_manager)
2509
2510
2511class ThreadsMixin(object):
2512 TYPE = 'threads'
2513 Process = multiprocessing.dummy.Process
2514 locals().update(get_attributes(multiprocessing.dummy, (
2515 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2516 'Condition', 'Event', 'Value', 'Array', 'current_process',
2517 'active_children', 'Pipe', 'connection', 'dict', 'list',
2518 'Namespace', 'JoinableQueue'
2519 )))
2520
2521testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2522globals().update(testcases_threads)
2523
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002524class OtherTest(unittest.TestCase):
2525 # TODO: add more tests for deliver/answer challenge.
2526 def test_deliver_challenge_auth_failure(self):
2527 class _FakeConnection(object):
2528 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002529 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002530 def send_bytes(self, data):
2531 pass
2532 self.assertRaises(multiprocessing.AuthenticationError,
2533 multiprocessing.connection.deliver_challenge,
2534 _FakeConnection(), b'abc')
2535
2536 def test_answer_challenge_auth_failure(self):
2537 class _FakeConnection(object):
2538 def __init__(self):
2539 self.count = 0
2540 def recv_bytes(self, size):
2541 self.count += 1
2542 if self.count == 1:
2543 return multiprocessing.connection.CHALLENGE
2544 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002545 return b'something bogus'
2546 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002547 def send_bytes(self, data):
2548 pass
2549 self.assertRaises(multiprocessing.AuthenticationError,
2550 multiprocessing.connection.answer_challenge,
2551 _FakeConnection(), b'abc')
2552
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002553#
2554# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2555#
2556
2557def initializer(ns):
2558 ns.test += 1
2559
2560class TestInitializers(unittest.TestCase):
2561 def setUp(self):
2562 self.mgr = multiprocessing.Manager()
2563 self.ns = self.mgr.Namespace()
2564 self.ns.test = 0
2565
2566 def tearDown(self):
2567 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002568 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002569
2570 def test_manager_initializer(self):
2571 m = multiprocessing.managers.SyncManager()
2572 self.assertRaises(TypeError, m.start, 1)
2573 m.start(initializer, (self.ns,))
2574 self.assertEqual(self.ns.test, 1)
2575 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002576 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002577
2578 def test_pool_initializer(self):
2579 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2580 p = multiprocessing.Pool(1, initializer, (self.ns,))
2581 p.close()
2582 p.join()
2583 self.assertEqual(self.ns.test, 1)
2584
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002585#
2586# Issue 5155, 5313, 5331: Test process in processes
2587# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2588#
2589
2590def _ThisSubProcess(q):
2591 try:
2592 item = q.get(block=False)
2593 except pyqueue.Empty:
2594 pass
2595
2596def _TestProcess(q):
2597 queue = multiprocessing.Queue()
2598 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002599 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002600 subProc.start()
2601 subProc.join()
2602
2603def _afunc(x):
2604 return x*x
2605
2606def pool_in_process():
2607 pool = multiprocessing.Pool(processes=4)
2608 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002609 pool.close()
2610 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002611
2612class _file_like(object):
2613 def __init__(self, delegate):
2614 self._delegate = delegate
2615 self._pid = None
2616
2617 @property
2618 def cache(self):
2619 pid = os.getpid()
2620 # There are no race conditions since fork keeps only the running thread
2621 if pid != self._pid:
2622 self._pid = pid
2623 self._cache = []
2624 return self._cache
2625
2626 def write(self, data):
2627 self.cache.append(data)
2628
2629 def flush(self):
2630 self._delegate.write(''.join(self.cache))
2631 self._cache = []
2632
2633class TestStdinBadfiledescriptor(unittest.TestCase):
2634
2635 def test_queue_in_process(self):
2636 queue = multiprocessing.Queue()
2637 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2638 proc.start()
2639 proc.join()
2640
2641 def test_pool_in_process(self):
2642 p = multiprocessing.Process(target=pool_in_process)
2643 p.start()
2644 p.join()
2645
2646 def test_flushing(self):
2647 sio = io.StringIO()
2648 flike = _file_like(sio)
2649 flike.write('foo')
2650 proc = multiprocessing.Process(target=lambda: flike.flush())
2651 flike.flush()
2652 assert sio.getvalue() == 'foo'
2653
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002654
2655class TestWait(unittest.TestCase):
2656
2657 @classmethod
2658 def _child_test_wait(cls, w, slow):
2659 for i in range(10):
2660 if slow:
2661 time.sleep(random.random()*0.1)
2662 w.send((i, os.getpid()))
2663 w.close()
2664
2665 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002666 from multiprocessing.connection import wait
2667 readers = []
2668 procs = []
2669 messages = []
2670
2671 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002672 r, w = multiprocessing.Pipe(duplex=False)
2673 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002674 p.daemon = True
2675 p.start()
2676 w.close()
2677 readers.append(r)
2678 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002679 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002680
2681 while readers:
2682 for r in wait(readers):
2683 try:
2684 msg = r.recv()
2685 except EOFError:
2686 readers.remove(r)
2687 r.close()
2688 else:
2689 messages.append(msg)
2690
2691 messages.sort()
2692 expected = sorted((i, p.pid) for i in range(10) for p in procs)
2693 self.assertEqual(messages, expected)
2694
2695 @classmethod
2696 def _child_test_wait_socket(cls, address, slow):
2697 s = socket.socket()
2698 s.connect(address)
2699 for i in range(10):
2700 if slow:
2701 time.sleep(random.random()*0.1)
2702 s.sendall(('%s\n' % i).encode('ascii'))
2703 s.close()
2704
2705 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002706 from multiprocessing.connection import wait
2707 l = socket.socket()
2708 l.bind(('', 0))
2709 l.listen(4)
2710 addr = ('localhost', l.getsockname()[1])
2711 readers = []
2712 procs = []
2713 dic = {}
2714
2715 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002716 p = multiprocessing.Process(target=self._child_test_wait_socket,
2717 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002718 p.daemon = True
2719 p.start()
2720 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002721 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002722
2723 for i in range(4):
2724 r, _ = l.accept()
2725 readers.append(r)
2726 dic[r] = []
2727 l.close()
2728
2729 while readers:
2730 for r in wait(readers):
2731 msg = r.recv(32)
2732 if not msg:
2733 readers.remove(r)
2734 r.close()
2735 else:
2736 dic[r].append(msg)
2737
2738 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
2739 for v in dic.values():
2740 self.assertEqual(b''.join(v), expected)
2741
2742 def test_wait_slow(self):
2743 self.test_wait(True)
2744
2745 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01002746 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002747
2748 def test_wait_timeout(self):
2749 from multiprocessing.connection import wait
2750
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002751 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002752 a, b = multiprocessing.Pipe()
2753
2754 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002755 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002756 delta = time.time() - start
2757
2758 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01002759 self.assertLess(delta, expected * 2)
2760 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002761
2762 b.send(None)
2763
2764 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002765 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002766 delta = time.time() - start
2767
2768 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01002769 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002770
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002771 @classmethod
2772 def signal_and_sleep(cls, sem, period):
2773 sem.release()
2774 time.sleep(period)
2775
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002776 def test_wait_integer(self):
2777 from multiprocessing.connection import wait
2778
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002779 expected = 3
2780 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002781 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002782 p = multiprocessing.Process(target=self.signal_and_sleep,
2783 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002784
2785 p.start()
2786 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002787 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002788
2789 start = time.time()
2790 res = wait([a, p.sentinel, b], expected + 20)
2791 delta = time.time() - start
2792
2793 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01002794 self.assertLess(delta, expected + 2)
2795 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002796
2797 a.send(None)
2798
2799 start = time.time()
2800 res = wait([a, p.sentinel, b], 20)
2801 delta = time.time() - start
2802
2803 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002804 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002805
2806 b.send(None)
2807
2808 start = time.time()
2809 res = wait([a, p.sentinel, b], 20)
2810 delta = time.time() - start
2811
2812 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002813 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002814
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002815 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002816 p.join()
2817
Richard Oudkerk59d54042012-05-10 16:11:12 +01002818 def test_neg_timeout(self):
2819 from multiprocessing.connection import wait
2820 a, b = multiprocessing.Pipe()
2821 t = time.time()
2822 res = wait([a], timeout=-1)
2823 t = time.time() - t
2824 self.assertEqual(res, [])
2825 self.assertLess(t, 1)
2826 a.close()
2827 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002828
Antoine Pitrou709176f2012-04-01 17:19:09 +02002829#
2830# Issue 14151: Test invalid family on invalid environment
2831#
2832
2833class TestInvalidFamily(unittest.TestCase):
2834
2835 @unittest.skipIf(WIN32, "skipped on Windows")
2836 def test_invalid_family(self):
2837 with self.assertRaises(ValueError):
2838 multiprocessing.connection.Listener(r'\\.\test')
2839
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002840 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2841 def test_invalid_family_win32(self):
2842 with self.assertRaises(ValueError):
2843 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002844
Richard Oudkerk77c84f22012-05-18 14:28:02 +01002845#
2846# Issue 12098: check sys.flags of child matches that for parent
2847#
2848
2849class TestFlags(unittest.TestCase):
2850 @classmethod
2851 def run_in_grandchild(cls, conn):
2852 conn.send(tuple(sys.flags))
2853
2854 @classmethod
2855 def run_in_child(cls):
2856 import json
2857 r, w = multiprocessing.Pipe(duplex=False)
2858 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2859 p.start()
2860 grandchild_flags = r.recv()
2861 p.join()
2862 r.close()
2863 w.close()
2864 flags = (tuple(sys.flags), grandchild_flags)
2865 print(json.dumps(flags))
2866
2867 def test_flags(self):
2868 import json, subprocess
2869 # start child process using unusual flags
2870 prog = ('from test.test_multiprocessing import TestFlags; ' +
2871 'TestFlags.run_in_child()')
2872 data = subprocess.check_output(
2873 [sys.executable, '-E', '-S', '-O', '-c', prog])
2874 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2875 self.assertEqual(child_flags, grandchild_flags)
2876
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002877testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01002878 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
2879 TestFlags]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002880
Benjamin Petersone711caf2008-06-11 16:44:04 +00002881#
2882#
2883#
2884
2885def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002886 if sys.platform.startswith("linux"):
2887 try:
2888 lock = multiprocessing.RLock()
2889 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002890 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002891
Charles-François Natali221ef672011-11-22 18:55:22 +01002892 check_enough_semaphores()
2893
Benjamin Petersone711caf2008-06-11 16:44:04 +00002894 if run is None:
2895 from test.support import run_unittest as run
2896
2897 util.get_temp_dir() # creates temp directory for use by all processes
2898
2899 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2900
Benjamin Peterson41181742008-07-02 20:22:54 +00002901 ProcessesMixin.pool = multiprocessing.Pool(4)
2902 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2903 ManagerMixin.manager.__init__()
2904 ManagerMixin.manager.start()
2905 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002906
2907 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002908 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2909 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002910 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2911 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002912 )
2913
2914 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2915 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002916 try:
2917 run(suite)
2918 finally:
2919 ThreadsMixin.pool.terminate()
2920 ProcessesMixin.pool.terminate()
2921 ManagerMixin.pool.terminate()
2922 ManagerMixin.pool.join()
2923 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002924 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002925 ThreadsMixin.pool.join()
2926 ProcessesMixin.pool.join()
2927 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002928
2929def main():
2930 test_main(unittest.TextTestRunner(verbosity=2).run)
2931
2932if __name__ == '__main__':
2933 main()