blob: f02041ebf295e8dd2633850ab4caac790d16506f [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086
Richard Oudkerk59d54042012-05-10 16:11:12 +010087from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
Richard Oudkerk59d54042012-05-10 16:11:12 +010089def wait_for_handle(handle, timeout):
90 if timeout is not None and timeout < 0.0:
91 timeout = None
92 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000093
Antoine Pitroubcb39d42011-08-23 19:46:22 +020094try:
95 MAXFD = os.sysconf("SC_OPEN_MAX")
96except:
97 MAXFD = 256
98
Benjamin Petersone711caf2008-06-11 16:44:04 +000099#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000100# Some tests require ctypes
101#
102
103try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000104 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000105except ImportError:
106 Structure = object
107 c_int = c_double = None
108
Charles-François Natali221ef672011-11-22 18:55:22 +0100109
110def check_enough_semaphores():
111 """Check that the system supports enough semaphores to run the test."""
112 # minimum number of semaphores available according to POSIX
113 nsems_min = 256
114 try:
115 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
116 except (AttributeError, ValueError):
117 # sysconf not available or setting not available
118 return
119 if nsems == -1 or nsems >= nsems_min:
120 return
121 raise unittest.SkipTest("The OS doesn't support enough semaphores "
122 "to run the test (required: %d)." % nsems_min)
123
124
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000125#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126# Creates a wrapper for a function which records the time it takes to finish
127#
128
129class TimingWrapper(object):
130
131 def __init__(self, func):
132 self.func = func
133 self.elapsed = None
134
135 def __call__(self, *args, **kwds):
136 t = time.time()
137 try:
138 return self.func(*args, **kwds)
139 finally:
140 self.elapsed = time.time() - t
141
142#
143# Base class for test cases
144#
145
146class BaseTestCase(object):
147
148 ALLOWED_TYPES = ('processes', 'manager', 'threads')
149
150 def assertTimingAlmostEqual(self, a, b):
151 if CHECK_TIMINGS:
152 self.assertAlmostEqual(a, b, 1)
153
154 def assertReturnsIfImplemented(self, value, func, *args):
155 try:
156 res = func(*args)
157 except NotImplementedError:
158 pass
159 else:
160 return self.assertEqual(value, res)
161
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000162 # For the sanity of Windows users, rather than crashing or freezing in
163 # multiple ways.
164 def __reduce__(self, *args):
165 raise NotImplementedError("shouldn't try to pickle a test case")
166
167 __reduce_ex__ = __reduce__
168
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169#
170# Return the value of a semaphore
171#
172
173def get_value(self):
174 try:
175 return self.get_value()
176 except AttributeError:
177 try:
178 return self._Semaphore__value
179 except AttributeError:
180 try:
181 return self._value
182 except AttributeError:
183 raise NotImplementedError
184
185#
186# Testcases
187#
188
189class _TestProcess(BaseTestCase):
190
191 ALLOWED_TYPES = ('processes', 'threads')
192
193 def test_current(self):
194 if self.TYPE == 'threads':
195 return
196
197 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000198 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000199
200 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000202 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertEqual(current.ident, os.getpid())
205 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000207 def test_daemon_argument(self):
208 if self.TYPE == "threads":
209 return
210
211 # By default uses the current process's daemon flag.
212 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000213 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000214 proc1 = self.Process(target=self._test, daemon=True)
215 self.assertTrue(proc1.daemon)
216 proc2 = self.Process(target=self._test, daemon=False)
217 self.assertFalse(proc2.daemon)
218
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000219 @classmethod
220 def _test(cls, q, *args, **kwds):
221 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 q.put(args)
223 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000224 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000225 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227 q.put(current.pid)
228
229 def test_process(self):
230 q = self.Queue(1)
231 e = self.Event()
232 args = (q, 1, 2)
233 kwargs = {'hello':23, 'bye':2.54}
234 name = 'SomeProcess'
235 p = self.Process(
236 target=self._test, args=args, kwargs=kwargs, name=name
237 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000238 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239 current = self.current_process()
240
241 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000242 self.assertEqual(p.authkey, current.authkey)
243 self.assertEqual(p.is_alive(), False)
244 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000245 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000247 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248
249 p.start()
250
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(p.exitcode, None)
252 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000254
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(q.get(), args[1:])
256 self.assertEqual(q.get(), kwargs)
257 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000259 self.assertEqual(q.get(), current.authkey)
260 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 p.join()
263
Ezio Melottib3aedd42010-11-20 19:04:17 +0000264 self.assertEqual(p.exitcode, 0)
265 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000266 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000268 @classmethod
269 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 time.sleep(1000)
271
272 def test_terminate(self):
273 if self.TYPE == 'threads':
274 return
275
276 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000277 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000278 p.start()
279
280 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000281 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000282 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283
Richard Oudkerk59d54042012-05-10 16:11:12 +0100284 join = TimingWrapper(p.join)
285
286 self.assertEqual(join(0), None)
287 self.assertTimingAlmostEqual(join.elapsed, 0.0)
288 self.assertEqual(p.is_alive(), True)
289
290 self.assertEqual(join(-1), None)
291 self.assertTimingAlmostEqual(join.elapsed, 0.0)
292 self.assertEqual(p.is_alive(), True)
293
Benjamin Petersone711caf2008-06-11 16:44:04 +0000294 p.terminate()
295
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296 self.assertEqual(join(), None)
297 self.assertTimingAlmostEqual(join.elapsed, 0.0)
298
299 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000300 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301
302 p.join()
303
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000304 # XXX sometimes get p.exitcode == 0 on Windows ...
305 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306
307 def test_cpu_count(self):
308 try:
309 cpus = multiprocessing.cpu_count()
310 except NotImplementedError:
311 cpus = 1
312 self.assertTrue(type(cpus) is int)
313 self.assertTrue(cpus >= 1)
314
315 def test_active_children(self):
316 self.assertEqual(type(self.active_children()), list)
317
318 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000319 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000320
Jesus Cea94f964f2011-09-09 20:26:57 +0200321 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000323 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324
325 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000328 @classmethod
329 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330 from multiprocessing import forking
331 wconn.send(id)
332 if len(id) < 2:
333 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000334 p = cls.Process(
335 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000336 )
337 p.start()
338 p.join()
339
340 def test_recursion(self):
341 rconn, wconn = self.Pipe(duplex=False)
342 self._test_recursion(wconn, [])
343
344 time.sleep(DELTA)
345 result = []
346 while rconn.poll():
347 result.append(rconn.recv())
348
349 expected = [
350 [],
351 [0],
352 [0, 0],
353 [0, 1],
354 [1],
355 [1, 0],
356 [1, 1]
357 ]
358 self.assertEqual(result, expected)
359
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200360 @classmethod
361 def _test_sentinel(cls, event):
362 event.wait(10.0)
363
364 def test_sentinel(self):
365 if self.TYPE == "threads":
366 return
367 event = self.Event()
368 p = self.Process(target=self._test_sentinel, args=(event,))
369 with self.assertRaises(ValueError):
370 p.sentinel
371 p.start()
372 self.addCleanup(p.join)
373 sentinel = p.sentinel
374 self.assertIsInstance(sentinel, int)
375 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
376 event.set()
377 p.join()
378 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
379
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380#
381#
382#
383
384class _UpperCaser(multiprocessing.Process):
385
386 def __init__(self):
387 multiprocessing.Process.__init__(self)
388 self.child_conn, self.parent_conn = multiprocessing.Pipe()
389
390 def run(self):
391 self.parent_conn.close()
392 for s in iter(self.child_conn.recv, None):
393 self.child_conn.send(s.upper())
394 self.child_conn.close()
395
396 def submit(self, s):
397 assert type(s) is str
398 self.parent_conn.send(s)
399 return self.parent_conn.recv()
400
401 def stop(self):
402 self.parent_conn.send(None)
403 self.parent_conn.close()
404 self.child_conn.close()
405
406class _TestSubclassingProcess(BaseTestCase):
407
408 ALLOWED_TYPES = ('processes',)
409
410 def test_subclassing(self):
411 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200412 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413 uppercaser.start()
414 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
415 self.assertEqual(uppercaser.submit('world'), 'WORLD')
416 uppercaser.stop()
417 uppercaser.join()
418
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100419 def test_stderr_flush(self):
420 # sys.stderr is flushed at process shutdown (issue #13812)
421 if self.TYPE == "threads":
422 return
423
424 testfn = test.support.TESTFN
425 self.addCleanup(test.support.unlink, testfn)
426 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
427 proc.start()
428 proc.join()
429 with open(testfn, 'r') as f:
430 err = f.read()
431 # The whole traceback was printed
432 self.assertIn("ZeroDivisionError", err)
433 self.assertIn("test_multiprocessing.py", err)
434 self.assertIn("1/0 # MARKER", err)
435
436 @classmethod
437 def _test_stderr_flush(cls, testfn):
438 sys.stderr = open(testfn, 'w')
439 1/0 # MARKER
440
441
Benjamin Petersone711caf2008-06-11 16:44:04 +0000442#
443#
444#
445
446def queue_empty(q):
447 if hasattr(q, 'empty'):
448 return q.empty()
449 else:
450 return q.qsize() == 0
451
452def queue_full(q, maxsize):
453 if hasattr(q, 'full'):
454 return q.full()
455 else:
456 return q.qsize() == maxsize
457
458
459class _TestQueue(BaseTestCase):
460
461
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000462 @classmethod
463 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000464 child_can_start.wait()
465 for i in range(6):
466 queue.get()
467 parent_can_continue.set()
468
469 def test_put(self):
470 MAXSIZE = 6
471 queue = self.Queue(maxsize=MAXSIZE)
472 child_can_start = self.Event()
473 parent_can_continue = self.Event()
474
475 proc = self.Process(
476 target=self._test_put,
477 args=(queue, child_can_start, parent_can_continue)
478 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000479 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000480 proc.start()
481
482 self.assertEqual(queue_empty(queue), True)
483 self.assertEqual(queue_full(queue, MAXSIZE), False)
484
485 queue.put(1)
486 queue.put(2, True)
487 queue.put(3, True, None)
488 queue.put(4, False)
489 queue.put(5, False, None)
490 queue.put_nowait(6)
491
492 # the values may be in buffer but not yet in pipe so sleep a bit
493 time.sleep(DELTA)
494
495 self.assertEqual(queue_empty(queue), False)
496 self.assertEqual(queue_full(queue, MAXSIZE), True)
497
498 put = TimingWrapper(queue.put)
499 put_nowait = TimingWrapper(queue.put_nowait)
500
501 self.assertRaises(pyqueue.Full, put, 7, False)
502 self.assertTimingAlmostEqual(put.elapsed, 0)
503
504 self.assertRaises(pyqueue.Full, put, 7, False, None)
505 self.assertTimingAlmostEqual(put.elapsed, 0)
506
507 self.assertRaises(pyqueue.Full, put_nowait, 7)
508 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
509
510 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
511 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
512
513 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
514 self.assertTimingAlmostEqual(put.elapsed, 0)
515
516 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
517 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
518
519 child_can_start.set()
520 parent_can_continue.wait()
521
522 self.assertEqual(queue_empty(queue), True)
523 self.assertEqual(queue_full(queue, MAXSIZE), False)
524
525 proc.join()
526
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000527 @classmethod
528 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000529 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000530 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000531 queue.put(2)
532 queue.put(3)
533 queue.put(4)
534 queue.put(5)
535 parent_can_continue.set()
536
537 def test_get(self):
538 queue = self.Queue()
539 child_can_start = self.Event()
540 parent_can_continue = self.Event()
541
542 proc = self.Process(
543 target=self._test_get,
544 args=(queue, child_can_start, parent_can_continue)
545 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000546 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000547 proc.start()
548
549 self.assertEqual(queue_empty(queue), True)
550
551 child_can_start.set()
552 parent_can_continue.wait()
553
554 time.sleep(DELTA)
555 self.assertEqual(queue_empty(queue), False)
556
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000557 # Hangs unexpectedly, remove for now
558 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000559 self.assertEqual(queue.get(True, None), 2)
560 self.assertEqual(queue.get(True), 3)
561 self.assertEqual(queue.get(timeout=1), 4)
562 self.assertEqual(queue.get_nowait(), 5)
563
564 self.assertEqual(queue_empty(queue), True)
565
566 get = TimingWrapper(queue.get)
567 get_nowait = TimingWrapper(queue.get_nowait)
568
569 self.assertRaises(pyqueue.Empty, get, False)
570 self.assertTimingAlmostEqual(get.elapsed, 0)
571
572 self.assertRaises(pyqueue.Empty, get, False, None)
573 self.assertTimingAlmostEqual(get.elapsed, 0)
574
575 self.assertRaises(pyqueue.Empty, get_nowait)
576 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
577
578 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
579 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
580
581 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
582 self.assertTimingAlmostEqual(get.elapsed, 0)
583
584 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
585 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
586
587 proc.join()
588
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000589 @classmethod
590 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000591 for i in range(10, 20):
592 queue.put(i)
593 # note that at this point the items may only be buffered, so the
594 # process cannot shutdown until the feeder thread has finished
595 # pushing items onto the pipe.
596
597 def test_fork(self):
598 # Old versions of Queue would fail to create a new feeder
599 # thread for a forked process if the original process had its
600 # own feeder thread. This test checks that this no longer
601 # happens.
602
603 queue = self.Queue()
604
605 # put items on queue so that main process starts a feeder thread
606 for i in range(10):
607 queue.put(i)
608
609 # wait to make sure thread starts before we fork a new process
610 time.sleep(DELTA)
611
612 # fork process
613 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200614 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000615 p.start()
616
617 # check that all expected items are in the queue
618 for i in range(20):
619 self.assertEqual(queue.get(), i)
620 self.assertRaises(pyqueue.Empty, queue.get, False)
621
622 p.join()
623
624 def test_qsize(self):
625 q = self.Queue()
626 try:
627 self.assertEqual(q.qsize(), 0)
628 except NotImplementedError:
629 return
630 q.put(1)
631 self.assertEqual(q.qsize(), 1)
632 q.put(5)
633 self.assertEqual(q.qsize(), 2)
634 q.get()
635 self.assertEqual(q.qsize(), 1)
636 q.get()
637 self.assertEqual(q.qsize(), 0)
638
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000639 @classmethod
640 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000641 for obj in iter(q.get, None):
642 time.sleep(DELTA)
643 q.task_done()
644
645 def test_task_done(self):
646 queue = self.JoinableQueue()
647
648 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000649 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000650
651 workers = [self.Process(target=self._test_task_done, args=(queue,))
652 for i in range(4)]
653
654 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200655 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000656 p.start()
657
658 for i in range(10):
659 queue.put(i)
660
661 queue.join()
662
663 for p in workers:
664 queue.put(None)
665
666 for p in workers:
667 p.join()
668
669#
670#
671#
672
673class _TestLock(BaseTestCase):
674
675 def test_lock(self):
676 lock = self.Lock()
677 self.assertEqual(lock.acquire(), True)
678 self.assertEqual(lock.acquire(False), False)
679 self.assertEqual(lock.release(), None)
680 self.assertRaises((ValueError, threading.ThreadError), lock.release)
681
682 def test_rlock(self):
683 lock = self.RLock()
684 self.assertEqual(lock.acquire(), True)
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(), True)
687 self.assertEqual(lock.release(), None)
688 self.assertEqual(lock.release(), None)
689 self.assertEqual(lock.release(), None)
690 self.assertRaises((AssertionError, RuntimeError), lock.release)
691
Jesse Nollerf8d00852009-03-31 03:25:07 +0000692 def test_lock_context(self):
693 with self.Lock():
694 pass
695
Benjamin Petersone711caf2008-06-11 16:44:04 +0000696
697class _TestSemaphore(BaseTestCase):
698
699 def _test_semaphore(self, sem):
700 self.assertReturnsIfImplemented(2, get_value, sem)
701 self.assertEqual(sem.acquire(), True)
702 self.assertReturnsIfImplemented(1, get_value, sem)
703 self.assertEqual(sem.acquire(), True)
704 self.assertReturnsIfImplemented(0, get_value, sem)
705 self.assertEqual(sem.acquire(False), False)
706 self.assertReturnsIfImplemented(0, get_value, sem)
707 self.assertEqual(sem.release(), None)
708 self.assertReturnsIfImplemented(1, get_value, sem)
709 self.assertEqual(sem.release(), None)
710 self.assertReturnsIfImplemented(2, get_value, sem)
711
712 def test_semaphore(self):
713 sem = self.Semaphore(2)
714 self._test_semaphore(sem)
715 self.assertEqual(sem.release(), None)
716 self.assertReturnsIfImplemented(3, get_value, sem)
717 self.assertEqual(sem.release(), None)
718 self.assertReturnsIfImplemented(4, get_value, sem)
719
720 def test_bounded_semaphore(self):
721 sem = self.BoundedSemaphore(2)
722 self._test_semaphore(sem)
723 # Currently fails on OS/X
724 #if HAVE_GETVALUE:
725 # self.assertRaises(ValueError, sem.release)
726 # self.assertReturnsIfImplemented(2, get_value, sem)
727
728 def test_timeout(self):
729 if self.TYPE != 'processes':
730 return
731
732 sem = self.Semaphore(0)
733 acquire = TimingWrapper(sem.acquire)
734
735 self.assertEqual(acquire(False), False)
736 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
737
738 self.assertEqual(acquire(False, None), False)
739 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
740
741 self.assertEqual(acquire(False, TIMEOUT1), False)
742 self.assertTimingAlmostEqual(acquire.elapsed, 0)
743
744 self.assertEqual(acquire(True, TIMEOUT2), False)
745 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
746
747 self.assertEqual(acquire(timeout=TIMEOUT3), False)
748 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
749
750
751class _TestCondition(BaseTestCase):
752
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000753 @classmethod
754 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000755 cond.acquire()
756 sleeping.release()
757 cond.wait(timeout)
758 woken.release()
759 cond.release()
760
761 def check_invariant(self, cond):
762 # this is only supposed to succeed when there are no sleepers
763 if self.TYPE == 'processes':
764 try:
765 sleepers = (cond._sleeping_count.get_value() -
766 cond._woken_count.get_value())
767 self.assertEqual(sleepers, 0)
768 self.assertEqual(cond._wait_semaphore.get_value(), 0)
769 except NotImplementedError:
770 pass
771
772 def test_notify(self):
773 cond = self.Condition()
774 sleeping = self.Semaphore(0)
775 woken = self.Semaphore(0)
776
777 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000778 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779 p.start()
780
781 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000782 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000783 p.start()
784
785 # wait for both children to start sleeping
786 sleeping.acquire()
787 sleeping.acquire()
788
789 # check no process/thread has woken up
790 time.sleep(DELTA)
791 self.assertReturnsIfImplemented(0, get_value, woken)
792
793 # wake up one process/thread
794 cond.acquire()
795 cond.notify()
796 cond.release()
797
798 # check one process/thread has woken up
799 time.sleep(DELTA)
800 self.assertReturnsIfImplemented(1, get_value, woken)
801
802 # wake up another
803 cond.acquire()
804 cond.notify()
805 cond.release()
806
807 # check other has woken up
808 time.sleep(DELTA)
809 self.assertReturnsIfImplemented(2, get_value, woken)
810
811 # check state is not mucked up
812 self.check_invariant(cond)
813 p.join()
814
815 def test_notify_all(self):
816 cond = self.Condition()
817 sleeping = self.Semaphore(0)
818 woken = self.Semaphore(0)
819
820 # start some threads/processes which will timeout
821 for i in range(3):
822 p = self.Process(target=self.f,
823 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000824 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000825 p.start()
826
827 t = threading.Thread(target=self.f,
828 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000829 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000830 t.start()
831
832 # wait for them all to sleep
833 for i in range(6):
834 sleeping.acquire()
835
836 # check they have all timed out
837 for i in range(6):
838 woken.acquire()
839 self.assertReturnsIfImplemented(0, get_value, woken)
840
841 # check state is not mucked up
842 self.check_invariant(cond)
843
844 # start some more threads/processes
845 for i in range(3):
846 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000847 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000848 p.start()
849
850 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000851 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000852 t.start()
853
854 # wait for them to all sleep
855 for i in range(6):
856 sleeping.acquire()
857
858 # check no process/thread has woken up
859 time.sleep(DELTA)
860 self.assertReturnsIfImplemented(0, get_value, woken)
861
862 # wake them all up
863 cond.acquire()
864 cond.notify_all()
865 cond.release()
866
867 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200868 for i in range(10):
869 try:
870 if get_value(woken) == 6:
871 break
872 except NotImplementedError:
873 break
874 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000875 self.assertReturnsIfImplemented(6, get_value, woken)
876
877 # check state is not mucked up
878 self.check_invariant(cond)
879
880 def test_timeout(self):
881 cond = self.Condition()
882 wait = TimingWrapper(cond.wait)
883 cond.acquire()
884 res = wait(TIMEOUT1)
885 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000886 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000887 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
888
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200889 @classmethod
890 def _test_waitfor_f(cls, cond, state):
891 with cond:
892 state.value = 0
893 cond.notify()
894 result = cond.wait_for(lambda : state.value==4)
895 if not result or state.value != 4:
896 sys.exit(1)
897
898 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
899 def test_waitfor(self):
900 # based on test in test/lock_tests.py
901 cond = self.Condition()
902 state = self.Value('i', -1)
903
904 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
905 p.daemon = True
906 p.start()
907
908 with cond:
909 result = cond.wait_for(lambda : state.value==0)
910 self.assertTrue(result)
911 self.assertEqual(state.value, 0)
912
913 for i in range(4):
914 time.sleep(0.01)
915 with cond:
916 state.value += 1
917 cond.notify()
918
919 p.join(5)
920 self.assertFalse(p.is_alive())
921 self.assertEqual(p.exitcode, 0)
922
923 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100924 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
925 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200926 with cond:
927 expected = 0.1
928 dt = time.time()
929 result = cond.wait_for(lambda : state.value==4, timeout=expected)
930 dt = time.time() - dt
931 # borrow logic in assertTimeout() from test/lock_tests.py
932 if not result and expected * 0.6 < dt < expected * 10.0:
933 success.value = True
934
935 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
936 def test_waitfor_timeout(self):
937 # based on test in test/lock_tests.py
938 cond = self.Condition()
939 state = self.Value('i', 0)
940 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100941 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200942
943 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100944 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200945 p.daemon = True
946 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100947 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200948
949 # Only increment 3 times, so state == 4 is never reached.
950 for i in range(3):
951 time.sleep(0.01)
952 with cond:
953 state.value += 1
954 cond.notify()
955
956 p.join(5)
957 self.assertTrue(success.value)
958
Benjamin Petersone711caf2008-06-11 16:44:04 +0000959
960class _TestEvent(BaseTestCase):
961
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000962 @classmethod
963 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964 time.sleep(TIMEOUT2)
965 event.set()
966
967 def test_event(self):
968 event = self.Event()
969 wait = TimingWrapper(event.wait)
970
Ezio Melotti13925002011-03-16 11:05:33 +0200971 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000972 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000973 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000974
Benjamin Peterson965ce872009-04-05 21:24:58 +0000975 # Removed, threading.Event.wait() will return the value of the __flag
976 # instead of None. API Shear with the semaphore backed mp.Event
977 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000978 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000979 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000980 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
981
982 event.set()
983
984 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000985 self.assertEqual(event.is_set(), True)
986 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000987 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000988 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000989 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
990 # self.assertEqual(event.is_set(), True)
991
992 event.clear()
993
994 #self.assertEqual(event.is_set(), False)
995
Jesus Cea94f964f2011-09-09 20:26:57 +0200996 p = self.Process(target=self._test_event, args=(event,))
997 p.daemon = True
998 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000999 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000
1001#
1002#
1003#
1004
1005class _TestValue(BaseTestCase):
1006
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001007 ALLOWED_TYPES = ('processes',)
1008
Benjamin Petersone711caf2008-06-11 16:44:04 +00001009 codes_values = [
1010 ('i', 4343, 24234),
1011 ('d', 3.625, -4.25),
1012 ('h', -232, 234),
1013 ('c', latin('x'), latin('y'))
1014 ]
1015
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001016 def setUp(self):
1017 if not HAS_SHAREDCTYPES:
1018 self.skipTest("requires multiprocessing.sharedctypes")
1019
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001020 @classmethod
1021 def _test(cls, values):
1022 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001023 sv.value = cv[2]
1024
1025
1026 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001027 if raw:
1028 values = [self.RawValue(code, value)
1029 for code, value, _ in self.codes_values]
1030 else:
1031 values = [self.Value(code, value)
1032 for code, value, _ in self.codes_values]
1033
1034 for sv, cv in zip(values, self.codes_values):
1035 self.assertEqual(sv.value, cv[1])
1036
1037 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001038 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 proc.start()
1040 proc.join()
1041
1042 for sv, cv in zip(values, self.codes_values):
1043 self.assertEqual(sv.value, cv[2])
1044
1045 def test_rawvalue(self):
1046 self.test_value(raw=True)
1047
1048 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049 val1 = self.Value('i', 5)
1050 lock1 = val1.get_lock()
1051 obj1 = val1.get_obj()
1052
1053 val2 = self.Value('i', 5, lock=None)
1054 lock2 = val2.get_lock()
1055 obj2 = val2.get_obj()
1056
1057 lock = self.Lock()
1058 val3 = self.Value('i', 5, lock=lock)
1059 lock3 = val3.get_lock()
1060 obj3 = val3.get_obj()
1061 self.assertEqual(lock, lock3)
1062
Jesse Nollerb0516a62009-01-18 03:11:38 +00001063 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064 self.assertFalse(hasattr(arr4, 'get_lock'))
1065 self.assertFalse(hasattr(arr4, 'get_obj'))
1066
Jesse Nollerb0516a62009-01-18 03:11:38 +00001067 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1068
1069 arr5 = self.RawValue('i', 5)
1070 self.assertFalse(hasattr(arr5, 'get_lock'))
1071 self.assertFalse(hasattr(arr5, 'get_obj'))
1072
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073
1074class _TestArray(BaseTestCase):
1075
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001076 ALLOWED_TYPES = ('processes',)
1077
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001078 @classmethod
1079 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001080 for i in range(1, len(seq)):
1081 seq[i] += seq[i-1]
1082
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001083 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001084 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1086 if raw:
1087 arr = self.RawArray('i', seq)
1088 else:
1089 arr = self.Array('i', seq)
1090
1091 self.assertEqual(len(arr), len(seq))
1092 self.assertEqual(arr[3], seq[3])
1093 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1094
1095 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1096
1097 self.assertEqual(list(arr[:]), seq)
1098
1099 self.f(seq)
1100
1101 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001102 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001103 p.start()
1104 p.join()
1105
1106 self.assertEqual(list(arr[:]), seq)
1107
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001108 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001109 def test_array_from_size(self):
1110 size = 10
1111 # Test for zeroing (see issue #11675).
1112 # The repetition below strengthens the test by increasing the chances
1113 # of previously allocated non-zero memory being used for the new array
1114 # on the 2nd and 3rd loops.
1115 for _ in range(3):
1116 arr = self.Array('i', size)
1117 self.assertEqual(len(arr), size)
1118 self.assertEqual(list(arr), [0] * size)
1119 arr[:] = range(10)
1120 self.assertEqual(list(arr), list(range(10)))
1121 del arr
1122
1123 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001124 def test_rawarray(self):
1125 self.test_array(raw=True)
1126
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001127 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001128 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001129 arr1 = self.Array('i', list(range(10)))
1130 lock1 = arr1.get_lock()
1131 obj1 = arr1.get_obj()
1132
1133 arr2 = self.Array('i', list(range(10)), lock=None)
1134 lock2 = arr2.get_lock()
1135 obj2 = arr2.get_obj()
1136
1137 lock = self.Lock()
1138 arr3 = self.Array('i', list(range(10)), lock=lock)
1139 lock3 = arr3.get_lock()
1140 obj3 = arr3.get_obj()
1141 self.assertEqual(lock, lock3)
1142
Jesse Nollerb0516a62009-01-18 03:11:38 +00001143 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001144 self.assertFalse(hasattr(arr4, 'get_lock'))
1145 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001146 self.assertRaises(AttributeError,
1147 self.Array, 'i', range(10), lock='notalock')
1148
1149 arr5 = self.RawArray('i', range(10))
1150 self.assertFalse(hasattr(arr5, 'get_lock'))
1151 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001152
1153#
1154#
1155#
1156
1157class _TestContainers(BaseTestCase):
1158
1159 ALLOWED_TYPES = ('manager',)
1160
1161 def test_list(self):
1162 a = self.list(list(range(10)))
1163 self.assertEqual(a[:], list(range(10)))
1164
1165 b = self.list()
1166 self.assertEqual(b[:], [])
1167
1168 b.extend(list(range(5)))
1169 self.assertEqual(b[:], list(range(5)))
1170
1171 self.assertEqual(b[2], 2)
1172 self.assertEqual(b[2:10], [2,3,4])
1173
1174 b *= 2
1175 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1176
1177 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1178
1179 self.assertEqual(a[:], list(range(10)))
1180
1181 d = [a, b]
1182 e = self.list(d)
1183 self.assertEqual(
1184 e[:],
1185 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1186 )
1187
1188 f = self.list([a])
1189 a.append('hello')
1190 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1191
1192 def test_dict(self):
1193 d = self.dict()
1194 indices = list(range(65, 70))
1195 for i in indices:
1196 d[i] = chr(i)
1197 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1198 self.assertEqual(sorted(d.keys()), indices)
1199 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1200 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1201
1202 def test_namespace(self):
1203 n = self.Namespace()
1204 n.name = 'Bob'
1205 n.job = 'Builder'
1206 n._hidden = 'hidden'
1207 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1208 del n.job
1209 self.assertEqual(str(n), "Namespace(name='Bob')")
1210 self.assertTrue(hasattr(n, 'name'))
1211 self.assertTrue(not hasattr(n, 'job'))
1212
1213#
1214#
1215#
1216
1217def sqr(x, wait=0.0):
1218 time.sleep(wait)
1219 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001220
Antoine Pitroude911b22011-12-21 11:03:24 +01001221def mul(x, y):
1222 return x*y
1223
Benjamin Petersone711caf2008-06-11 16:44:04 +00001224class _TestPool(BaseTestCase):
1225
1226 def test_apply(self):
1227 papply = self.pool.apply
1228 self.assertEqual(papply(sqr, (5,)), sqr(5))
1229 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1230
1231 def test_map(self):
1232 pmap = self.pool.map
1233 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1234 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1235 list(map(sqr, list(range(100)))))
1236
Antoine Pitroude911b22011-12-21 11:03:24 +01001237 def test_starmap(self):
1238 psmap = self.pool.starmap
1239 tuples = list(zip(range(10), range(9,-1, -1)))
1240 self.assertEqual(psmap(mul, tuples),
1241 list(itertools.starmap(mul, tuples)))
1242 tuples = list(zip(range(100), range(99,-1, -1)))
1243 self.assertEqual(psmap(mul, tuples, chunksize=20),
1244 list(itertools.starmap(mul, tuples)))
1245
1246 def test_starmap_async(self):
1247 tuples = list(zip(range(100), range(99,-1, -1)))
1248 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1249 list(itertools.starmap(mul, tuples)))
1250
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001251 def test_map_chunksize(self):
1252 try:
1253 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1254 except multiprocessing.TimeoutError:
1255 self.fail("pool.map_async with chunksize stalled on null list")
1256
Benjamin Petersone711caf2008-06-11 16:44:04 +00001257 def test_async(self):
1258 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1259 get = TimingWrapper(res.get)
1260 self.assertEqual(get(), 49)
1261 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1262
1263 def test_async_timeout(self):
1264 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1265 get = TimingWrapper(res.get)
1266 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1267 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1268
1269 def test_imap(self):
1270 it = self.pool.imap(sqr, list(range(10)))
1271 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1272
1273 it = self.pool.imap(sqr, list(range(10)))
1274 for i in range(10):
1275 self.assertEqual(next(it), i*i)
1276 self.assertRaises(StopIteration, it.__next__)
1277
1278 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1279 for i in range(1000):
1280 self.assertEqual(next(it), i*i)
1281 self.assertRaises(StopIteration, it.__next__)
1282
1283 def test_imap_unordered(self):
1284 it = self.pool.imap_unordered(sqr, list(range(1000)))
1285 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1286
1287 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1288 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1289
1290 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001291 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1292 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1293
Benjamin Petersone711caf2008-06-11 16:44:04 +00001294 p = multiprocessing.Pool(3)
1295 self.assertEqual(3, len(p._pool))
1296 p.close()
1297 p.join()
1298
1299 def test_terminate(self):
1300 if self.TYPE == 'manager':
1301 # On Unix a forked process increfs each shared object to
1302 # which its parent process held a reference. If the
1303 # forked process gets terminated then there is likely to
1304 # be a reference leak. So to prevent
1305 # _TestZZZNumberOfObjects from failing we skip this test
1306 # when using a manager.
1307 return
1308
1309 result = self.pool.map_async(
1310 time.sleep, [0.1 for i in range(10000)], chunksize=1
1311 )
1312 self.pool.terminate()
1313 join = TimingWrapper(self.pool.join)
1314 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001315 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001316
Ask Solem2afcbf22010-11-09 20:55:52 +00001317def raising():
1318 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001319
Ask Solem2afcbf22010-11-09 20:55:52 +00001320def unpickleable_result():
1321 return lambda: 42
1322
1323class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001324 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001325
1326 def test_async_error_callback(self):
1327 p = multiprocessing.Pool(2)
1328
1329 scratchpad = [None]
1330 def errback(exc):
1331 scratchpad[0] = exc
1332
1333 res = p.apply_async(raising, error_callback=errback)
1334 self.assertRaises(KeyError, res.get)
1335 self.assertTrue(scratchpad[0])
1336 self.assertIsInstance(scratchpad[0], KeyError)
1337
1338 p.close()
1339 p.join()
1340
1341 def test_unpickleable_result(self):
1342 from multiprocessing.pool import MaybeEncodingError
1343 p = multiprocessing.Pool(2)
1344
1345 # Make sure we don't lose pool processes because of encoding errors.
1346 for iteration in range(20):
1347
1348 scratchpad = [None]
1349 def errback(exc):
1350 scratchpad[0] = exc
1351
1352 res = p.apply_async(unpickleable_result, error_callback=errback)
1353 self.assertRaises(MaybeEncodingError, res.get)
1354 wrapped = scratchpad[0]
1355 self.assertTrue(wrapped)
1356 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1357 self.assertIsNotNone(wrapped.exc)
1358 self.assertIsNotNone(wrapped.value)
1359
1360 p.close()
1361 p.join()
1362
1363class _TestPoolWorkerLifetime(BaseTestCase):
1364 ALLOWED_TYPES = ('processes', )
1365
Jesse Noller1f0b6582010-01-27 03:36:01 +00001366 def test_pool_worker_lifetime(self):
1367 p = multiprocessing.Pool(3, maxtasksperchild=10)
1368 self.assertEqual(3, len(p._pool))
1369 origworkerpids = [w.pid for w in p._pool]
1370 # Run many tasks so each worker gets replaced (hopefully)
1371 results = []
1372 for i in range(100):
1373 results.append(p.apply_async(sqr, (i, )))
1374 # Fetch the results and verify we got the right answers,
1375 # also ensuring all the tasks have completed.
1376 for (j, res) in enumerate(results):
1377 self.assertEqual(res.get(), sqr(j))
1378 # Refill the pool
1379 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001380 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001381 # (countdown * DELTA = 5 seconds max startup process time)
1382 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001383 while countdown and not all(w.is_alive() for w in p._pool):
1384 countdown -= 1
1385 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001386 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001387 # All pids should be assigned. See issue #7805.
1388 self.assertNotIn(None, origworkerpids)
1389 self.assertNotIn(None, finalworkerpids)
1390 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001391 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1392 p.close()
1393 p.join()
1394
Charles-François Natalif8859e12011-10-24 18:45:29 +02001395 def test_pool_worker_lifetime_early_close(self):
1396 # Issue #10332: closing a pool whose workers have limited lifetimes
1397 # before all the tasks completed would make join() hang.
1398 p = multiprocessing.Pool(3, maxtasksperchild=1)
1399 results = []
1400 for i in range(6):
1401 results.append(p.apply_async(sqr, (i, 0.3)))
1402 p.close()
1403 p.join()
1404 # check the results
1405 for (j, res) in enumerate(results):
1406 self.assertEqual(res.get(), sqr(j))
1407
1408
Benjamin Petersone711caf2008-06-11 16:44:04 +00001409#
1410# Test that manager has expected number of shared objects left
1411#
1412
1413class _TestZZZNumberOfObjects(BaseTestCase):
1414 # Because test cases are sorted alphabetically, this one will get
1415 # run after all the other tests for the manager. It tests that
1416 # there have been no "reference leaks" for the manager's shared
1417 # objects. Note the comment in _TestPool.test_terminate().
1418 ALLOWED_TYPES = ('manager',)
1419
1420 def test_number_of_objects(self):
1421 EXPECTED_NUMBER = 1 # the pool object is still alive
1422 multiprocessing.active_children() # discard dead process objs
1423 gc.collect() # do garbage collection
1424 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001425 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001426 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001427 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001428 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001429
1430 self.assertEqual(refs, EXPECTED_NUMBER)
1431
1432#
1433# Test of creating a customized manager class
1434#
1435
1436from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1437
1438class FooBar(object):
1439 def f(self):
1440 return 'f()'
1441 def g(self):
1442 raise ValueError
1443 def _h(self):
1444 return '_h()'
1445
1446def baz():
1447 for i in range(10):
1448 yield i*i
1449
1450class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001451 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001452 def __iter__(self):
1453 return self
1454 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001455 return self._callmethod('__next__')
1456
1457class MyManager(BaseManager):
1458 pass
1459
1460MyManager.register('Foo', callable=FooBar)
1461MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1462MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1463
1464
1465class _TestMyManager(BaseTestCase):
1466
1467 ALLOWED_TYPES = ('manager',)
1468
1469 def test_mymanager(self):
1470 manager = MyManager()
1471 manager.start()
1472
1473 foo = manager.Foo()
1474 bar = manager.Bar()
1475 baz = manager.baz()
1476
1477 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1478 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1479
1480 self.assertEqual(foo_methods, ['f', 'g'])
1481 self.assertEqual(bar_methods, ['f', '_h'])
1482
1483 self.assertEqual(foo.f(), 'f()')
1484 self.assertRaises(ValueError, foo.g)
1485 self.assertEqual(foo._callmethod('f'), 'f()')
1486 self.assertRaises(RemoteError, foo._callmethod, '_h')
1487
1488 self.assertEqual(bar.f(), 'f()')
1489 self.assertEqual(bar._h(), '_h()')
1490 self.assertEqual(bar._callmethod('f'), 'f()')
1491 self.assertEqual(bar._callmethod('_h'), '_h()')
1492
1493 self.assertEqual(list(baz), [i*i for i in range(10)])
1494
1495 manager.shutdown()
1496
1497#
1498# Test of connecting to a remote server and using xmlrpclib for serialization
1499#
1500
1501_queue = pyqueue.Queue()
1502def get_queue():
1503 return _queue
1504
1505class QueueManager(BaseManager):
1506 '''manager class used by server process'''
1507QueueManager.register('get_queue', callable=get_queue)
1508
1509class QueueManager2(BaseManager):
1510 '''manager class which specifies the same interface as QueueManager'''
1511QueueManager2.register('get_queue')
1512
1513
1514SERIALIZER = 'xmlrpclib'
1515
1516class _TestRemoteManager(BaseTestCase):
1517
1518 ALLOWED_TYPES = ('manager',)
1519
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001520 @classmethod
1521 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001522 manager = QueueManager2(
1523 address=address, authkey=authkey, serializer=SERIALIZER
1524 )
1525 manager.connect()
1526 queue = manager.get_queue()
1527 queue.put(('hello world', None, True, 2.25))
1528
1529 def test_remote(self):
1530 authkey = os.urandom(32)
1531
1532 manager = QueueManager(
1533 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1534 )
1535 manager.start()
1536
1537 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001538 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001539 p.start()
1540
1541 manager2 = QueueManager2(
1542 address=manager.address, authkey=authkey, serializer=SERIALIZER
1543 )
1544 manager2.connect()
1545 queue = manager2.get_queue()
1546
1547 # Note that xmlrpclib will deserialize object as a list not a tuple
1548 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1549
1550 # Because we are using xmlrpclib for serialization instead of
1551 # pickle this will cause a serialization error.
1552 self.assertRaises(Exception, queue.put, time.sleep)
1553
1554 # Make queue finalizer run before the server is stopped
1555 del queue
1556 manager.shutdown()
1557
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001558class _TestManagerRestart(BaseTestCase):
1559
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001560 @classmethod
1561 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001562 manager = QueueManager(
1563 address=address, authkey=authkey, serializer=SERIALIZER)
1564 manager.connect()
1565 queue = manager.get_queue()
1566 queue.put('hello world')
1567
1568 def test_rapid_restart(self):
1569 authkey = os.urandom(32)
1570 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001571 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001572 srvr = manager.get_server()
1573 addr = srvr.address
1574 # Close the connection.Listener socket which gets opened as a part
1575 # of manager.get_server(). It's not needed for the test.
1576 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001577 manager.start()
1578
1579 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001580 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001581 p.start()
1582 queue = manager.get_queue()
1583 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001584 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001585 manager.shutdown()
1586 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001587 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001588 try:
1589 manager.start()
1590 except IOError as e:
1591 if e.errno != errno.EADDRINUSE:
1592 raise
1593 # Retry after some time, in case the old socket was lingering
1594 # (sporadic failure on buildbots)
1595 time.sleep(1.0)
1596 manager = QueueManager(
1597 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001598 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001599
Benjamin Petersone711caf2008-06-11 16:44:04 +00001600#
1601#
1602#
1603
1604SENTINEL = latin('')
1605
1606class _TestConnection(BaseTestCase):
1607
1608 ALLOWED_TYPES = ('processes', 'threads')
1609
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001610 @classmethod
1611 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001612 for msg in iter(conn.recv_bytes, SENTINEL):
1613 conn.send_bytes(msg)
1614 conn.close()
1615
1616 def test_connection(self):
1617 conn, child_conn = self.Pipe()
1618
1619 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001620 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001621 p.start()
1622
1623 seq = [1, 2.25, None]
1624 msg = latin('hello world')
1625 longmsg = msg * 10
1626 arr = array.array('i', list(range(4)))
1627
1628 if self.TYPE == 'processes':
1629 self.assertEqual(type(conn.fileno()), int)
1630
1631 self.assertEqual(conn.send(seq), None)
1632 self.assertEqual(conn.recv(), seq)
1633
1634 self.assertEqual(conn.send_bytes(msg), None)
1635 self.assertEqual(conn.recv_bytes(), msg)
1636
1637 if self.TYPE == 'processes':
1638 buffer = array.array('i', [0]*10)
1639 expected = list(arr) + [0] * (10 - len(arr))
1640 self.assertEqual(conn.send_bytes(arr), None)
1641 self.assertEqual(conn.recv_bytes_into(buffer),
1642 len(arr) * buffer.itemsize)
1643 self.assertEqual(list(buffer), expected)
1644
1645 buffer = array.array('i', [0]*10)
1646 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1647 self.assertEqual(conn.send_bytes(arr), None)
1648 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1649 len(arr) * buffer.itemsize)
1650 self.assertEqual(list(buffer), expected)
1651
1652 buffer = bytearray(latin(' ' * 40))
1653 self.assertEqual(conn.send_bytes(longmsg), None)
1654 try:
1655 res = conn.recv_bytes_into(buffer)
1656 except multiprocessing.BufferTooShort as e:
1657 self.assertEqual(e.args, (longmsg,))
1658 else:
1659 self.fail('expected BufferTooShort, got %s' % res)
1660
1661 poll = TimingWrapper(conn.poll)
1662
1663 self.assertEqual(poll(), False)
1664 self.assertTimingAlmostEqual(poll.elapsed, 0)
1665
Richard Oudkerk59d54042012-05-10 16:11:12 +01001666 self.assertEqual(poll(-1), False)
1667 self.assertTimingAlmostEqual(poll.elapsed, 0)
1668
Benjamin Petersone711caf2008-06-11 16:44:04 +00001669 self.assertEqual(poll(TIMEOUT1), False)
1670 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1671
1672 conn.send(None)
1673
1674 self.assertEqual(poll(TIMEOUT1), True)
1675 self.assertTimingAlmostEqual(poll.elapsed, 0)
1676
1677 self.assertEqual(conn.recv(), None)
1678
1679 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1680 conn.send_bytes(really_big_msg)
1681 self.assertEqual(conn.recv_bytes(), really_big_msg)
1682
1683 conn.send_bytes(SENTINEL) # tell child to quit
1684 child_conn.close()
1685
1686 if self.TYPE == 'processes':
1687 self.assertEqual(conn.readable, True)
1688 self.assertEqual(conn.writable, True)
1689 self.assertRaises(EOFError, conn.recv)
1690 self.assertRaises(EOFError, conn.recv_bytes)
1691
1692 p.join()
1693
1694 def test_duplex_false(self):
1695 reader, writer = self.Pipe(duplex=False)
1696 self.assertEqual(writer.send(1), None)
1697 self.assertEqual(reader.recv(), 1)
1698 if self.TYPE == 'processes':
1699 self.assertEqual(reader.readable, True)
1700 self.assertEqual(reader.writable, False)
1701 self.assertEqual(writer.readable, False)
1702 self.assertEqual(writer.writable, True)
1703 self.assertRaises(IOError, reader.send, 2)
1704 self.assertRaises(IOError, writer.recv)
1705 self.assertRaises(IOError, writer.poll)
1706
1707 def test_spawn_close(self):
1708 # We test that a pipe connection can be closed by parent
1709 # process immediately after child is spawned. On Windows this
1710 # would have sometimes failed on old versions because
1711 # child_conn would be closed before the child got a chance to
1712 # duplicate it.
1713 conn, child_conn = self.Pipe()
1714
1715 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001716 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001717 p.start()
1718 child_conn.close() # this might complete before child initializes
1719
1720 msg = latin('hello')
1721 conn.send_bytes(msg)
1722 self.assertEqual(conn.recv_bytes(), msg)
1723
1724 conn.send_bytes(SENTINEL)
1725 conn.close()
1726 p.join()
1727
1728 def test_sendbytes(self):
1729 if self.TYPE != 'processes':
1730 return
1731
1732 msg = latin('abcdefghijklmnopqrstuvwxyz')
1733 a, b = self.Pipe()
1734
1735 a.send_bytes(msg)
1736 self.assertEqual(b.recv_bytes(), msg)
1737
1738 a.send_bytes(msg, 5)
1739 self.assertEqual(b.recv_bytes(), msg[5:])
1740
1741 a.send_bytes(msg, 7, 8)
1742 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1743
1744 a.send_bytes(msg, 26)
1745 self.assertEqual(b.recv_bytes(), latin(''))
1746
1747 a.send_bytes(msg, 26, 0)
1748 self.assertEqual(b.recv_bytes(), latin(''))
1749
1750 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1751
1752 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1753
1754 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1755
1756 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1757
1758 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1759
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001760 @classmethod
1761 def _is_fd_assigned(cls, fd):
1762 try:
1763 os.fstat(fd)
1764 except OSError as e:
1765 if e.errno == errno.EBADF:
1766 return False
1767 raise
1768 else:
1769 return True
1770
1771 @classmethod
1772 def _writefd(cls, conn, data, create_dummy_fds=False):
1773 if create_dummy_fds:
1774 for i in range(0, 256):
1775 if not cls._is_fd_assigned(i):
1776 os.dup2(conn.fileno(), i)
1777 fd = reduction.recv_handle(conn)
1778 if msvcrt:
1779 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1780 os.write(fd, data)
1781 os.close(fd)
1782
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001783 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001784 def test_fd_transfer(self):
1785 if self.TYPE != 'processes':
1786 self.skipTest("only makes sense with processes")
1787 conn, child_conn = self.Pipe(duplex=True)
1788
1789 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001790 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001791 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001792 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001793 with open(test.support.TESTFN, "wb") as f:
1794 fd = f.fileno()
1795 if msvcrt:
1796 fd = msvcrt.get_osfhandle(fd)
1797 reduction.send_handle(conn, fd, p.pid)
1798 p.join()
1799 with open(test.support.TESTFN, "rb") as f:
1800 self.assertEqual(f.read(), b"foo")
1801
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001802 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001803 @unittest.skipIf(sys.platform == "win32",
1804 "test semantics don't make sense on Windows")
1805 @unittest.skipIf(MAXFD <= 256,
1806 "largest assignable fd number is too small")
1807 @unittest.skipUnless(hasattr(os, "dup2"),
1808 "test needs os.dup2()")
1809 def test_large_fd_transfer(self):
1810 # With fd > 256 (issue #11657)
1811 if self.TYPE != 'processes':
1812 self.skipTest("only makes sense with processes")
1813 conn, child_conn = self.Pipe(duplex=True)
1814
1815 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001816 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001817 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001818 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001819 with open(test.support.TESTFN, "wb") as f:
1820 fd = f.fileno()
1821 for newfd in range(256, MAXFD):
1822 if not self._is_fd_assigned(newfd):
1823 break
1824 else:
1825 self.fail("could not find an unassigned large file descriptor")
1826 os.dup2(fd, newfd)
1827 try:
1828 reduction.send_handle(conn, newfd, p.pid)
1829 finally:
1830 os.close(newfd)
1831 p.join()
1832 with open(test.support.TESTFN, "rb") as f:
1833 self.assertEqual(f.read(), b"bar")
1834
Jesus Cea4507e642011-09-21 03:53:25 +02001835 @classmethod
1836 def _send_data_without_fd(self, conn):
1837 os.write(conn.fileno(), b"\0")
1838
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001839 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001840 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1841 def test_missing_fd_transfer(self):
1842 # Check that exception is raised when received data is not
1843 # accompanied by a file descriptor in ancillary data.
1844 if self.TYPE != 'processes':
1845 self.skipTest("only makes sense with processes")
1846 conn, child_conn = self.Pipe(duplex=True)
1847
1848 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1849 p.daemon = True
1850 p.start()
1851 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1852 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001853
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001854class _TestListener(BaseTestCase):
1855
1856 ALLOWED_TYPES = ('processes')
1857
1858 def test_multiple_bind(self):
1859 for family in self.connection.families:
1860 l = self.connection.Listener(family=family)
1861 self.addCleanup(l.close)
1862 self.assertRaises(OSError, self.connection.Listener,
1863 l.address, family)
1864
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865class _TestListenerClient(BaseTestCase):
1866
1867 ALLOWED_TYPES = ('processes', 'threads')
1868
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001869 @classmethod
1870 def _test(cls, address):
1871 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001872 conn.send('hello')
1873 conn.close()
1874
1875 def test_listener_client(self):
1876 for family in self.connection.families:
1877 l = self.connection.Listener(family=family)
1878 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001879 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001880 p.start()
1881 conn = l.accept()
1882 self.assertEqual(conn.recv(), 'hello')
1883 p.join()
1884 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001885
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01001886 def test_issue14725(self):
1887 l = self.connection.Listener()
1888 p = self.Process(target=self._test, args=(l.address,))
1889 p.daemon = True
1890 p.start()
1891 time.sleep(1)
1892 # On Windows the client process should by now have connected,
1893 # written data and closed the pipe handle by now. This causes
1894 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1895 # 14725.
1896 conn = l.accept()
1897 self.assertEqual(conn.recv(), 'hello')
1898 conn.close()
1899 p.join()
1900 l.close()
1901
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01001902class _TestPoll(unittest.TestCase):
1903
1904 ALLOWED_TYPES = ('processes', 'threads')
1905
1906 def test_empty_string(self):
1907 a, b = self.Pipe()
1908 self.assertEqual(a.poll(), False)
1909 b.send_bytes(b'')
1910 self.assertEqual(a.poll(), True)
1911 self.assertEqual(a.poll(), True)
1912
1913 @classmethod
1914 def _child_strings(cls, conn, strings):
1915 for s in strings:
1916 time.sleep(0.1)
1917 conn.send_bytes(s)
1918 conn.close()
1919
1920 def test_strings(self):
1921 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
1922 a, b = self.Pipe()
1923 p = self.Process(target=self._child_strings, args=(b, strings))
1924 p.start()
1925
1926 for s in strings:
1927 for i in range(200):
1928 if a.poll(0.01):
1929 break
1930 x = a.recv_bytes()
1931 self.assertEqual(s, x)
1932
1933 p.join()
1934
1935 @classmethod
1936 def _child_boundaries(cls, r):
1937 # Polling may "pull" a message in to the child process, but we
1938 # don't want it to pull only part of a message, as that would
1939 # corrupt the pipe for any other processes which might later
1940 # read from it.
1941 r.poll(5)
1942
1943 def test_boundaries(self):
1944 r, w = self.Pipe(False)
1945 p = self.Process(target=self._child_boundaries, args=(r,))
1946 p.start()
1947 time.sleep(2)
1948 L = [b"first", b"second"]
1949 for obj in L:
1950 w.send_bytes(obj)
1951 w.close()
1952 p.join()
1953 self.assertIn(r.recv_bytes(), L)
1954
1955 @classmethod
1956 def _child_dont_merge(cls, b):
1957 b.send_bytes(b'a')
1958 b.send_bytes(b'b')
1959 b.send_bytes(b'cd')
1960
1961 def test_dont_merge(self):
1962 a, b = self.Pipe()
1963 self.assertEqual(a.poll(0.0), False)
1964 self.assertEqual(a.poll(0.1), False)
1965
1966 p = self.Process(target=self._child_dont_merge, args=(b,))
1967 p.start()
1968
1969 self.assertEqual(a.recv_bytes(), b'a')
1970 self.assertEqual(a.poll(1.0), True)
1971 self.assertEqual(a.poll(1.0), True)
1972 self.assertEqual(a.recv_bytes(), b'b')
1973 self.assertEqual(a.poll(1.0), True)
1974 self.assertEqual(a.poll(1.0), True)
1975 self.assertEqual(a.poll(0.0), True)
1976 self.assertEqual(a.recv_bytes(), b'cd')
1977
1978 p.join()
1979
Benjamin Petersone711caf2008-06-11 16:44:04 +00001980#
1981# Test of sending connection and socket objects between processes
1982#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001983
Richard Oudkerk24524192012-04-30 14:48:51 +01001984# Intermittent fails on Mac OS X -- see Issue14669 and Issue12958
1985@unittest.skipIf(sys.platform == "darwin", "fd passing unreliable on Mac OS X")
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001986@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001987class _TestPicklingConnections(BaseTestCase):
1988
1989 ALLOWED_TYPES = ('processes',)
1990
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001991 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02001992 def tearDownClass(cls):
1993 from multiprocessing.reduction import resource_sharer
1994 resource_sharer.stop(timeout=5)
1995
1996 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001997 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001998 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001999 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002000 conn.send(l.address)
2001 new_conn = l.accept()
2002 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002003 new_conn.close()
2004 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002005
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002006 l = socket.socket()
2007 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002008 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002009 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002010 new_conn, addr = l.accept()
2011 conn.send(new_conn)
2012 new_conn.close()
2013 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002014
2015 conn.recv()
2016
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002017 @classmethod
2018 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002019 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002020 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002021 client.send(msg.upper())
2022 client.close()
2023
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002024 address, msg = conn.recv()
2025 client = socket.socket()
2026 client.connect(address)
2027 client.sendall(msg.upper())
2028 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002029
2030 conn.close()
2031
2032 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002033 families = self.connection.families
2034
2035 lconn, lconn0 = self.Pipe()
2036 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002037 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002038 lp.start()
2039 lconn0.close()
2040
2041 rconn, rconn0 = self.Pipe()
2042 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002043 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002044 rp.start()
2045 rconn0.close()
2046
2047 for fam in families:
2048 msg = ('This connection uses family %s' % fam).encode('ascii')
2049 address = lconn.recv()
2050 rconn.send((address, msg))
2051 new_conn = lconn.recv()
2052 self.assertEqual(new_conn.recv(), msg.upper())
2053
2054 rconn.send(None)
2055
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002056 msg = latin('This connection uses a normal socket')
2057 address = lconn.recv()
2058 rconn.send((address, msg))
2059 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002060 buf = []
2061 while True:
2062 s = new_conn.recv(100)
2063 if not s:
2064 break
2065 buf.append(s)
2066 buf = b''.join(buf)
2067 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002068 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002069
2070 lconn.send(None)
2071
2072 rconn.close()
2073 lconn.close()
2074
2075 lp.join()
2076 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002077
2078 @classmethod
2079 def child_access(cls, conn):
2080 w = conn.recv()
2081 w.send('all is well')
2082 w.close()
2083
2084 r = conn.recv()
2085 msg = r.recv()
2086 conn.send(msg*2)
2087
2088 conn.close()
2089
2090 def test_access(self):
2091 # On Windows, if we do not specify a destination pid when
2092 # using DupHandle then we need to be careful to use the
2093 # correct access flags for DuplicateHandle(), or else
2094 # DupHandle.detach() will raise PermissionError. For example,
2095 # for a read only pipe handle we should use
2096 # access=FILE_GENERIC_READ. (Unfortunately
2097 # DUPLICATE_SAME_ACCESS does not work.)
2098 conn, child_conn = self.Pipe()
2099 p = self.Process(target=self.child_access, args=(child_conn,))
2100 p.daemon = True
2101 p.start()
2102 child_conn.close()
2103
2104 r, w = self.Pipe(duplex=False)
2105 conn.send(w)
2106 w.close()
2107 self.assertEqual(r.recv(), 'all is well')
2108 r.close()
2109
2110 r, w = self.Pipe(duplex=False)
2111 conn.send(r)
2112 r.close()
2113 w.send('foobar')
2114 w.close()
2115 self.assertEqual(conn.recv(), 'foobar'*2)
2116
Benjamin Petersone711caf2008-06-11 16:44:04 +00002117#
2118#
2119#
2120
2121class _TestHeap(BaseTestCase):
2122
2123 ALLOWED_TYPES = ('processes',)
2124
2125 def test_heap(self):
2126 iterations = 5000
2127 maxblocks = 50
2128 blocks = []
2129
2130 # create and destroy lots of blocks of different sizes
2131 for i in range(iterations):
2132 size = int(random.lognormvariate(0, 1) * 1000)
2133 b = multiprocessing.heap.BufferWrapper(size)
2134 blocks.append(b)
2135 if len(blocks) > maxblocks:
2136 i = random.randrange(maxblocks)
2137 del blocks[i]
2138
2139 # get the heap object
2140 heap = multiprocessing.heap.BufferWrapper._heap
2141
2142 # verify the state of the heap
2143 all = []
2144 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002145 heap._lock.acquire()
2146 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002147 for L in list(heap._len_to_seq.values()):
2148 for arena, start, stop in L:
2149 all.append((heap._arenas.index(arena), start, stop,
2150 stop-start, 'free'))
2151 for arena, start, stop in heap._allocated_blocks:
2152 all.append((heap._arenas.index(arena), start, stop,
2153 stop-start, 'occupied'))
2154 occupied += (stop-start)
2155
2156 all.sort()
2157
2158 for i in range(len(all)-1):
2159 (arena, start, stop) = all[i][:3]
2160 (narena, nstart, nstop) = all[i+1][:3]
2161 self.assertTrue((arena != narena and nstart == 0) or
2162 (stop == nstart))
2163
Charles-François Natali778db492011-07-02 14:35:49 +02002164 def test_free_from_gc(self):
2165 # Check that freeing of blocks by the garbage collector doesn't deadlock
2166 # (issue #12352).
2167 # Make sure the GC is enabled, and set lower collection thresholds to
2168 # make collections more frequent (and increase the probability of
2169 # deadlock).
2170 if not gc.isenabled():
2171 gc.enable()
2172 self.addCleanup(gc.disable)
2173 thresholds = gc.get_threshold()
2174 self.addCleanup(gc.set_threshold, *thresholds)
2175 gc.set_threshold(10)
2176
2177 # perform numerous block allocations, with cyclic references to make
2178 # sure objects are collected asynchronously by the gc
2179 for i in range(5000):
2180 a = multiprocessing.heap.BufferWrapper(1)
2181 b = multiprocessing.heap.BufferWrapper(1)
2182 # circular references
2183 a.buddy = b
2184 b.buddy = a
2185
Benjamin Petersone711caf2008-06-11 16:44:04 +00002186#
2187#
2188#
2189
Benjamin Petersone711caf2008-06-11 16:44:04 +00002190class _Foo(Structure):
2191 _fields_ = [
2192 ('x', c_int),
2193 ('y', c_double)
2194 ]
2195
2196class _TestSharedCTypes(BaseTestCase):
2197
2198 ALLOWED_TYPES = ('processes',)
2199
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002200 def setUp(self):
2201 if not HAS_SHAREDCTYPES:
2202 self.skipTest("requires multiprocessing.sharedctypes")
2203
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002204 @classmethod
2205 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002206 x.value *= 2
2207 y.value *= 2
2208 foo.x *= 2
2209 foo.y *= 2
2210 string.value *= 2
2211 for i in range(len(arr)):
2212 arr[i] *= 2
2213
2214 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002215 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002216 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002217 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002218 arr = self.Array('d', list(range(10)), lock=lock)
2219 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002220 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002221
2222 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002223 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002224 p.start()
2225 p.join()
2226
2227 self.assertEqual(x.value, 14)
2228 self.assertAlmostEqual(y.value, 2.0/3.0)
2229 self.assertEqual(foo.x, 6)
2230 self.assertAlmostEqual(foo.y, 4.0)
2231 for i in range(10):
2232 self.assertAlmostEqual(arr[i], i*2)
2233 self.assertEqual(string.value, latin('hellohello'))
2234
2235 def test_synchronize(self):
2236 self.test_sharedctypes(lock=True)
2237
2238 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002239 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002240 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002241 foo.x = 0
2242 foo.y = 0
2243 self.assertEqual(bar.x, 2)
2244 self.assertAlmostEqual(bar.y, 5.0)
2245
2246#
2247#
2248#
2249
2250class _TestFinalize(BaseTestCase):
2251
2252 ALLOWED_TYPES = ('processes',)
2253
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002254 @classmethod
2255 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002256 class Foo(object):
2257 pass
2258
2259 a = Foo()
2260 util.Finalize(a, conn.send, args=('a',))
2261 del a # triggers callback for a
2262
2263 b = Foo()
2264 close_b = util.Finalize(b, conn.send, args=('b',))
2265 close_b() # triggers callback for b
2266 close_b() # does nothing because callback has already been called
2267 del b # does nothing because callback has already been called
2268
2269 c = Foo()
2270 util.Finalize(c, conn.send, args=('c',))
2271
2272 d10 = Foo()
2273 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2274
2275 d01 = Foo()
2276 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2277 d02 = Foo()
2278 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2279 d03 = Foo()
2280 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2281
2282 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2283
2284 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2285
Ezio Melotti13925002011-03-16 11:05:33 +02002286 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002287 # garbage collecting locals
2288 util._exit_function()
2289 conn.close()
2290 os._exit(0)
2291
2292 def test_finalize(self):
2293 conn, child_conn = self.Pipe()
2294
2295 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002296 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002297 p.start()
2298 p.join()
2299
2300 result = [obj for obj in iter(conn.recv, 'STOP')]
2301 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2302
2303#
2304# Test that from ... import * works for each module
2305#
2306
2307class _TestImportStar(BaseTestCase):
2308
2309 ALLOWED_TYPES = ('processes',)
2310
2311 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002312 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002313 'multiprocessing', 'multiprocessing.connection',
2314 'multiprocessing.heap', 'multiprocessing.managers',
2315 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002316 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002317 ]
2318
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002319 if HAS_REDUCTION:
2320 modules.append('multiprocessing.reduction')
2321
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002322 if c_int is not None:
2323 # This module requires _ctypes
2324 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002325
2326 for name in modules:
2327 __import__(name)
2328 mod = sys.modules[name]
2329
2330 for attr in getattr(mod, '__all__', ()):
2331 self.assertTrue(
2332 hasattr(mod, attr),
2333 '%r does not have attribute %r' % (mod, attr)
2334 )
2335
2336#
2337# Quick test that logging works -- does not test logging output
2338#
2339
2340class _TestLogging(BaseTestCase):
2341
2342 ALLOWED_TYPES = ('processes',)
2343
2344 def test_enable_logging(self):
2345 logger = multiprocessing.get_logger()
2346 logger.setLevel(util.SUBWARNING)
2347 self.assertTrue(logger is not None)
2348 logger.debug('this will not be printed')
2349 logger.info('nor will this')
2350 logger.setLevel(LOG_LEVEL)
2351
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002352 @classmethod
2353 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002354 logger = multiprocessing.get_logger()
2355 conn.send(logger.getEffectiveLevel())
2356
2357 def test_level(self):
2358 LEVEL1 = 32
2359 LEVEL2 = 37
2360
2361 logger = multiprocessing.get_logger()
2362 root_logger = logging.getLogger()
2363 root_level = root_logger.level
2364
2365 reader, writer = multiprocessing.Pipe(duplex=False)
2366
2367 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002368 p = self.Process(target=self._test_level, args=(writer,))
2369 p.daemon = True
2370 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002371 self.assertEqual(LEVEL1, reader.recv())
2372
2373 logger.setLevel(logging.NOTSET)
2374 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002375 p = self.Process(target=self._test_level, args=(writer,))
2376 p.daemon = True
2377 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002378 self.assertEqual(LEVEL2, reader.recv())
2379
2380 root_logger.setLevel(root_level)
2381 logger.setLevel(level=LOG_LEVEL)
2382
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002383
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002384# class _TestLoggingProcessName(BaseTestCase):
2385#
2386# def handle(self, record):
2387# assert record.processName == multiprocessing.current_process().name
2388# self.__handled = True
2389#
2390# def test_logging(self):
2391# handler = logging.Handler()
2392# handler.handle = self.handle
2393# self.__handled = False
2394# # Bypass getLogger() and side-effects
2395# logger = logging.getLoggerClass()(
2396# 'multiprocessing.test.TestLoggingProcessName')
2397# logger.addHandler(handler)
2398# logger.propagate = False
2399#
2400# logger.warn('foo')
2401# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002402
Benjamin Petersone711caf2008-06-11 16:44:04 +00002403#
Jesse Noller6214edd2009-01-19 16:23:53 +00002404# Test to verify handle verification, see issue 3321
2405#
2406
2407class TestInvalidHandle(unittest.TestCase):
2408
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002409 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002410 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002411 conn = multiprocessing.connection.Connection(44977608)
2412 try:
2413 self.assertRaises((ValueError, IOError), conn.poll)
2414 finally:
2415 # Hack private attribute _handle to avoid printing an error
2416 # in conn.__del__
2417 conn._handle = None
2418 self.assertRaises((ValueError, IOError),
2419 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002420
Jesse Noller6214edd2009-01-19 16:23:53 +00002421#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002422# Functions used to create test cases from the base ones in this module
2423#
2424
2425def get_attributes(Source, names):
2426 d = {}
2427 for name in names:
2428 obj = getattr(Source, name)
2429 if type(obj) == type(get_attributes):
2430 obj = staticmethod(obj)
2431 d[name] = obj
2432 return d
2433
2434def create_test_cases(Mixin, type):
2435 result = {}
2436 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002437 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002438
2439 for name in list(glob.keys()):
2440 if name.startswith('_Test'):
2441 base = glob[name]
2442 if type in base.ALLOWED_TYPES:
2443 newname = 'With' + Type + name[1:]
2444 class Temp(base, unittest.TestCase, Mixin):
2445 pass
2446 result[newname] = Temp
2447 Temp.__name__ = newname
2448 Temp.__module__ = Mixin.__module__
2449 return result
2450
2451#
2452# Create test cases
2453#
2454
2455class ProcessesMixin(object):
2456 TYPE = 'processes'
2457 Process = multiprocessing.Process
2458 locals().update(get_attributes(multiprocessing, (
2459 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2460 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2461 'RawArray', 'current_process', 'active_children', 'Pipe',
2462 'connection', 'JoinableQueue'
2463 )))
2464
2465testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2466globals().update(testcases_processes)
2467
2468
2469class ManagerMixin(object):
2470 TYPE = 'manager'
2471 Process = multiprocessing.Process
2472 manager = object.__new__(multiprocessing.managers.SyncManager)
2473 locals().update(get_attributes(manager, (
2474 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2475 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2476 'Namespace', 'JoinableQueue'
2477 )))
2478
2479testcases_manager = create_test_cases(ManagerMixin, type='manager')
2480globals().update(testcases_manager)
2481
2482
2483class ThreadsMixin(object):
2484 TYPE = 'threads'
2485 Process = multiprocessing.dummy.Process
2486 locals().update(get_attributes(multiprocessing.dummy, (
2487 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2488 'Condition', 'Event', 'Value', 'Array', 'current_process',
2489 'active_children', 'Pipe', 'connection', 'dict', 'list',
2490 'Namespace', 'JoinableQueue'
2491 )))
2492
2493testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2494globals().update(testcases_threads)
2495
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002496class OtherTest(unittest.TestCase):
2497 # TODO: add more tests for deliver/answer challenge.
2498 def test_deliver_challenge_auth_failure(self):
2499 class _FakeConnection(object):
2500 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002501 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002502 def send_bytes(self, data):
2503 pass
2504 self.assertRaises(multiprocessing.AuthenticationError,
2505 multiprocessing.connection.deliver_challenge,
2506 _FakeConnection(), b'abc')
2507
2508 def test_answer_challenge_auth_failure(self):
2509 class _FakeConnection(object):
2510 def __init__(self):
2511 self.count = 0
2512 def recv_bytes(self, size):
2513 self.count += 1
2514 if self.count == 1:
2515 return multiprocessing.connection.CHALLENGE
2516 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002517 return b'something bogus'
2518 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002519 def send_bytes(self, data):
2520 pass
2521 self.assertRaises(multiprocessing.AuthenticationError,
2522 multiprocessing.connection.answer_challenge,
2523 _FakeConnection(), b'abc')
2524
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002525#
2526# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2527#
2528
2529def initializer(ns):
2530 ns.test += 1
2531
2532class TestInitializers(unittest.TestCase):
2533 def setUp(self):
2534 self.mgr = multiprocessing.Manager()
2535 self.ns = self.mgr.Namespace()
2536 self.ns.test = 0
2537
2538 def tearDown(self):
2539 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002540 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002541
2542 def test_manager_initializer(self):
2543 m = multiprocessing.managers.SyncManager()
2544 self.assertRaises(TypeError, m.start, 1)
2545 m.start(initializer, (self.ns,))
2546 self.assertEqual(self.ns.test, 1)
2547 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002548 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002549
2550 def test_pool_initializer(self):
2551 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2552 p = multiprocessing.Pool(1, initializer, (self.ns,))
2553 p.close()
2554 p.join()
2555 self.assertEqual(self.ns.test, 1)
2556
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002557#
2558# Issue 5155, 5313, 5331: Test process in processes
2559# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2560#
2561
2562def _ThisSubProcess(q):
2563 try:
2564 item = q.get(block=False)
2565 except pyqueue.Empty:
2566 pass
2567
2568def _TestProcess(q):
2569 queue = multiprocessing.Queue()
2570 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002571 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002572 subProc.start()
2573 subProc.join()
2574
2575def _afunc(x):
2576 return x*x
2577
2578def pool_in_process():
2579 pool = multiprocessing.Pool(processes=4)
2580 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002581 pool.close()
2582 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002583
2584class _file_like(object):
2585 def __init__(self, delegate):
2586 self._delegate = delegate
2587 self._pid = None
2588
2589 @property
2590 def cache(self):
2591 pid = os.getpid()
2592 # There are no race conditions since fork keeps only the running thread
2593 if pid != self._pid:
2594 self._pid = pid
2595 self._cache = []
2596 return self._cache
2597
2598 def write(self, data):
2599 self.cache.append(data)
2600
2601 def flush(self):
2602 self._delegate.write(''.join(self.cache))
2603 self._cache = []
2604
2605class TestStdinBadfiledescriptor(unittest.TestCase):
2606
2607 def test_queue_in_process(self):
2608 queue = multiprocessing.Queue()
2609 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2610 proc.start()
2611 proc.join()
2612
2613 def test_pool_in_process(self):
2614 p = multiprocessing.Process(target=pool_in_process)
2615 p.start()
2616 p.join()
2617
2618 def test_flushing(self):
2619 sio = io.StringIO()
2620 flike = _file_like(sio)
2621 flike.write('foo')
2622 proc = multiprocessing.Process(target=lambda: flike.flush())
2623 flike.flush()
2624 assert sio.getvalue() == 'foo'
2625
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002626
2627class TestWait(unittest.TestCase):
2628
2629 @classmethod
2630 def _child_test_wait(cls, w, slow):
2631 for i in range(10):
2632 if slow:
2633 time.sleep(random.random()*0.1)
2634 w.send((i, os.getpid()))
2635 w.close()
2636
2637 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002638 from multiprocessing.connection import wait
2639 readers = []
2640 procs = []
2641 messages = []
2642
2643 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002644 r, w = multiprocessing.Pipe(duplex=False)
2645 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002646 p.daemon = True
2647 p.start()
2648 w.close()
2649 readers.append(r)
2650 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002651 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002652
2653 while readers:
2654 for r in wait(readers):
2655 try:
2656 msg = r.recv()
2657 except EOFError:
2658 readers.remove(r)
2659 r.close()
2660 else:
2661 messages.append(msg)
2662
2663 messages.sort()
2664 expected = sorted((i, p.pid) for i in range(10) for p in procs)
2665 self.assertEqual(messages, expected)
2666
2667 @classmethod
2668 def _child_test_wait_socket(cls, address, slow):
2669 s = socket.socket()
2670 s.connect(address)
2671 for i in range(10):
2672 if slow:
2673 time.sleep(random.random()*0.1)
2674 s.sendall(('%s\n' % i).encode('ascii'))
2675 s.close()
2676
2677 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002678 from multiprocessing.connection import wait
2679 l = socket.socket()
2680 l.bind(('', 0))
2681 l.listen(4)
2682 addr = ('localhost', l.getsockname()[1])
2683 readers = []
2684 procs = []
2685 dic = {}
2686
2687 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002688 p = multiprocessing.Process(target=self._child_test_wait_socket,
2689 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002690 p.daemon = True
2691 p.start()
2692 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002693 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002694
2695 for i in range(4):
2696 r, _ = l.accept()
2697 readers.append(r)
2698 dic[r] = []
2699 l.close()
2700
2701 while readers:
2702 for r in wait(readers):
2703 msg = r.recv(32)
2704 if not msg:
2705 readers.remove(r)
2706 r.close()
2707 else:
2708 dic[r].append(msg)
2709
2710 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
2711 for v in dic.values():
2712 self.assertEqual(b''.join(v), expected)
2713
2714 def test_wait_slow(self):
2715 self.test_wait(True)
2716
2717 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01002718 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002719
2720 def test_wait_timeout(self):
2721 from multiprocessing.connection import wait
2722
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002723 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002724 a, b = multiprocessing.Pipe()
2725
2726 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002727 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002728 delta = time.time() - start
2729
2730 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01002731 self.assertLess(delta, expected * 2)
2732 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002733
2734 b.send(None)
2735
2736 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002737 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002738 delta = time.time() - start
2739
2740 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01002741 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002742
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002743 @classmethod
2744 def signal_and_sleep(cls, sem, period):
2745 sem.release()
2746 time.sleep(period)
2747
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002748 def test_wait_integer(self):
2749 from multiprocessing.connection import wait
2750
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002751 expected = 3
2752 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002753 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002754 p = multiprocessing.Process(target=self.signal_and_sleep,
2755 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002756
2757 p.start()
2758 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002759 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002760
2761 start = time.time()
2762 res = wait([a, p.sentinel, b], expected + 20)
2763 delta = time.time() - start
2764
2765 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01002766 self.assertLess(delta, expected + 2)
2767 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002768
2769 a.send(None)
2770
2771 start = time.time()
2772 res = wait([a, p.sentinel, b], 20)
2773 delta = time.time() - start
2774
2775 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002776 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002777
2778 b.send(None)
2779
2780 start = time.time()
2781 res = wait([a, p.sentinel, b], 20)
2782 delta = time.time() - start
2783
2784 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002785 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002786
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002787 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002788 p.join()
2789
Richard Oudkerk59d54042012-05-10 16:11:12 +01002790 def test_neg_timeout(self):
2791 from multiprocessing.connection import wait
2792 a, b = multiprocessing.Pipe()
2793 t = time.time()
2794 res = wait([a], timeout=-1)
2795 t = time.time() - t
2796 self.assertEqual(res, [])
2797 self.assertLess(t, 1)
2798 a.close()
2799 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002800
Antoine Pitrou709176f2012-04-01 17:19:09 +02002801#
2802# Issue 14151: Test invalid family on invalid environment
2803#
2804
2805class TestInvalidFamily(unittest.TestCase):
2806
2807 @unittest.skipIf(WIN32, "skipped on Windows")
2808 def test_invalid_family(self):
2809 with self.assertRaises(ValueError):
2810 multiprocessing.connection.Listener(r'\\.\test')
2811
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002812 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2813 def test_invalid_family_win32(self):
2814 with self.assertRaises(ValueError):
2815 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002816
Richard Oudkerk77c84f22012-05-18 14:28:02 +01002817#
2818# Issue 12098: check sys.flags of child matches that for parent
2819#
2820
2821class TestFlags(unittest.TestCase):
2822 @classmethod
2823 def run_in_grandchild(cls, conn):
2824 conn.send(tuple(sys.flags))
2825
2826 @classmethod
2827 def run_in_child(cls):
2828 import json
2829 r, w = multiprocessing.Pipe(duplex=False)
2830 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2831 p.start()
2832 grandchild_flags = r.recv()
2833 p.join()
2834 r.close()
2835 w.close()
2836 flags = (tuple(sys.flags), grandchild_flags)
2837 print(json.dumps(flags))
2838
2839 def test_flags(self):
2840 import json, subprocess
2841 # start child process using unusual flags
2842 prog = ('from test.test_multiprocessing import TestFlags; ' +
2843 'TestFlags.run_in_child()')
2844 data = subprocess.check_output(
2845 [sys.executable, '-E', '-S', '-O', '-c', prog])
2846 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2847 self.assertEqual(child_flags, grandchild_flags)
2848
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002849testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01002850 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
2851 TestFlags]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002852
Benjamin Petersone711caf2008-06-11 16:44:04 +00002853#
2854#
2855#
2856
2857def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002858 if sys.platform.startswith("linux"):
2859 try:
2860 lock = multiprocessing.RLock()
2861 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002862 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002863
Charles-François Natali221ef672011-11-22 18:55:22 +01002864 check_enough_semaphores()
2865
Benjamin Petersone711caf2008-06-11 16:44:04 +00002866 if run is None:
2867 from test.support import run_unittest as run
2868
2869 util.get_temp_dir() # creates temp directory for use by all processes
2870
2871 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2872
Benjamin Peterson41181742008-07-02 20:22:54 +00002873 ProcessesMixin.pool = multiprocessing.Pool(4)
2874 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2875 ManagerMixin.manager.__init__()
2876 ManagerMixin.manager.start()
2877 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002878
2879 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002880 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2881 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002882 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2883 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002884 )
2885
2886 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2887 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002888 try:
2889 run(suite)
2890 finally:
2891 ThreadsMixin.pool.terminate()
2892 ProcessesMixin.pool.terminate()
2893 ManagerMixin.pool.terminate()
2894 ManagerMixin.pool.join()
2895 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002896 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002897 ThreadsMixin.pool.join()
2898 ProcessesMixin.pool.join()
2899 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002900
2901def main():
2902 test_main(unittest.TextTestRunner(verbosity=2).run)
2903
2904if __name__ == '__main__':
2905 main()